Guarantees for object reuse modes and documentation

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

Guarantees for object reuse modes and documentation

Fabian Hueske-2
Hi,



Flink's DataSet API features a configuration parameter called
enableObjectReuse(). If activated, Flink's runtime will create fewer
objects which results in better performance and lower garbage collection
overhead. Depending on whether the configuration switch is enabled or not,
user functions may or may not perform certain operations on objects they
receive from Flink or emit to Flink.



At the moment, there are quite a few open issues and discussions going on
about the object reuse mode, including the JIRA issues FLINK-3333,
FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.



IMO, the most important issue is FLINK-3333 which is about improving the
documentation of the object reuse mode. The current version [1] is
ambiguous and includes details about operator chaining which are hard to
understand and to reason about for users. Hence it is not very clear which
guarantees Flink gives for objects in user functions under which
conditions. This documentation needs to be improved and I think this should
happen together with the 1.0 release.



Greg and Gabor proposed two new versions:

1. Greg's version [2]  improves and clarifies the current documentation
without significantly changing the semantics. It also discusses operator
chaining, but gives more details.
2. Gabor's proposal [3] aims to make the discussion of object reuse
independent of operator chaining which I think is a very good idea because
it is not transparent to the user when function chaining happens. Gabor
formulated four questions to answer what users can do with and expect from
objects that they received or emitted from a function. In order to make the
answers to these questions independent of function chaining and still keep
the contracts as defined by the current documentation, we have to default
to rather restrictive rules. For instance, functions must always emit new
object instances in case of disabled object reuse mode. These strict rules
would for example also require DataSourceFunctions to copy all records
which they receive from an InputFormat (see FLINK-3335). IMO, the strict
guarantees make the disableObjectReuse mode harder to use and reason about
than the enableObjectReuse mode whereas the opposite should be the case.



I would like to suggest a third option. Similar as Gabor, I think the rules
should be independent of function chaining and I would like to break it
down into a handful of easy rules. However, I think we should loosen up the
guarantees for user functions under disableObjectReuse mode a bit.

Right now, the documentation states that under enableObjectReuse mode,
input objects are not changed across functions calls. Hence users can
remember these objects across functions calls and their value will not
change. I propose to give this guarantee only within functions calls and
only for objects which are not emitted. Hence, this rule only applies for
functions that can consume multiple values through an iterator such as
GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse mode,
these functions are allowed to remember the values e.g., in a collection,
and their value will not change when the iterator is forwarded. Once the
function call returns, the values might change. Since  functions with
iterators cannot be directly chained, it will be safe to emit the same
object instance several times (hence FLINK-3335 would become invalid).



The difference to the current guarantees is that input objects become
invalid after the function call returned. Since, the disableObjectReuse
mode was mainly introduced to allow for caching objects across iterator
calls within a GroupReduceFunction or CoGroupFunction (not across function
calls), I think this is a reasonable restriction.



tl;dr;

If we want to make the documentation of object reuse independent of
chaining we have to

- EITHER, give tighter guarantees / be more restrictive than now and update
internals which might lead to performance regression. This would be in-line
with the current documentation but somewhat defeat the purpose of the
disabledObjectReuse mode, IMO.

- OR, give weaker guarantees, which breaks with the current documentation,
but would not affect performance or be easier to follow for users, IMO.


Greg and Gabor, please correct me if I did not get your points right or
missed something.

What do others think?


Fabian



[1]
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior

[2]
https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151

[3]
https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg
Reply | Threaded
Open this post in threaded view
|

Re: Guarantees for object reuse modes and documentation

Matthias J. Sax-2
Hi,

I like Fabian's proposal. The idea of object reuse is performance gain,
and we should not sacrifice this. Even more important is that the rules
are easy to understand!

-Matthias


On 02/17/2016 06:17 PM, Fabian Hueske wrote:

> Hi,
>
>
>
> Flink's DataSet API features a configuration parameter called
> enableObjectReuse(). If activated, Flink's runtime will create fewer
> objects which results in better performance and lower garbage collection
> overhead. Depending on whether the configuration switch is enabled or not,
> user functions may or may not perform certain operations on objects they
> receive from Flink or emit to Flink.
>
>
>
> At the moment, there are quite a few open issues and discussions going on
> about the object reuse mode, including the JIRA issues FLINK-3333,
> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
>
>
>
> IMO, the most important issue is FLINK-3333 which is about improving the
> documentation of the object reuse mode. The current version [1] is
> ambiguous and includes details about operator chaining which are hard to
> understand and to reason about for users. Hence it is not very clear which
> guarantees Flink gives for objects in user functions under which
> conditions. This documentation needs to be improved and I think this should
> happen together with the 1.0 release.
>
>
>
> Greg and Gabor proposed two new versions:
>
> 1. Greg's version [2]  improves and clarifies the current documentation
> without significantly changing the semantics. It also discusses operator
> chaining, but gives more details.
> 2. Gabor's proposal [3] aims to make the discussion of object reuse
> independent of operator chaining which I think is a very good idea because
> it is not transparent to the user when function chaining happens. Gabor
> formulated four questions to answer what users can do with and expect from
> objects that they received or emitted from a function. In order to make the
> answers to these questions independent of function chaining and still keep
> the contracts as defined by the current documentation, we have to default
> to rather restrictive rules. For instance, functions must always emit new
> object instances in case of disabled object reuse mode. These strict rules
> would for example also require DataSourceFunctions to copy all records
> which they receive from an InputFormat (see FLINK-3335). IMO, the strict
> guarantees make the disableObjectReuse mode harder to use and reason about
> than the enableObjectReuse mode whereas the opposite should be the case.
>
>
>
> I would like to suggest a third option. Similar as Gabor, I think the rules
> should be independent of function chaining and I would like to break it
> down into a handful of easy rules. However, I think we should loosen up the
> guarantees for user functions under disableObjectReuse mode a bit.
>
> Right now, the documentation states that under enableObjectReuse mode,
> input objects are not changed across functions calls. Hence users can
> remember these objects across functions calls and their value will not
> change. I propose to give this guarantee only within functions calls and
> only for objects which are not emitted. Hence, this rule only applies for
> functions that can consume multiple values through an iterator such as
> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse mode,
> these functions are allowed to remember the values e.g., in a collection,
> and their value will not change when the iterator is forwarded. Once the
> function call returns, the values might change. Since  functions with
> iterators cannot be directly chained, it will be safe to emit the same
> object instance several times (hence FLINK-3335 would become invalid).
>
>
>
> The difference to the current guarantees is that input objects become
> invalid after the function call returned. Since, the disableObjectReuse
> mode was mainly introduced to allow for caching objects across iterator
> calls within a GroupReduceFunction or CoGroupFunction (not across function
> calls), I think this is a reasonable restriction.
>
>
>
> tl;dr;
>
> If we want to make the documentation of object reuse independent of
> chaining we have to
>
> - EITHER, give tighter guarantees / be more restrictive than now and update
> internals which might lead to performance regression. This would be in-line
> with the current documentation but somewhat defeat the purpose of the
> disabledObjectReuse mode, IMO.
>
> - OR, give weaker guarantees, which breaks with the current documentation,
> but would not affect performance or be easier to follow for users, IMO.
>
>
> Greg and Gabor, please correct me if I did not get your points right or
> missed something.
>
> What do others think?
>
>
> Fabian
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
>
> [2]
> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
>
> [3]
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg
>


signature.asc (836 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Guarantees for object reuse modes and documentation

Fabian Hueske-2
Thanks Matthias.
Maybe I should clarify, that I do not want to change the guarantees for the
enableObjectReuse mode, but for the disableObjectReuse mode.
The rules for the enableObjectReuse mode should remain the same.


2016-02-18 9:37 GMT+01:00 Matthias J. Sax <[hidden email]>:

> Hi,
>
> I like Fabian's proposal. The idea of object reuse is performance gain,
> and we should not sacrifice this. Even more important is that the rules
> are easy to understand!
>
> -Matthias
>
>
> On 02/17/2016 06:17 PM, Fabian Hueske wrote:
> > Hi,
> >
> >
> >
> > Flink's DataSet API features a configuration parameter called
> > enableObjectReuse(). If activated, Flink's runtime will create fewer
> > objects which results in better performance and lower garbage collection
> > overhead. Depending on whether the configuration switch is enabled or
> not,
> > user functions may or may not perform certain operations on objects they
> > receive from Flink or emit to Flink.
> >
> >
> >
> > At the moment, there are quite a few open issues and discussions going on
> > about the object reuse mode, including the JIRA issues FLINK-3333,
> > FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
> >
> >
> >
> > IMO, the most important issue is FLINK-3333 which is about improving the
> > documentation of the object reuse mode. The current version [1] is
> > ambiguous and includes details about operator chaining which are hard to
> > understand and to reason about for users. Hence it is not very clear
> which
> > guarantees Flink gives for objects in user functions under which
> > conditions. This documentation needs to be improved and I think this
> should
> > happen together with the 1.0 release.
> >
> >
> >
> > Greg and Gabor proposed two new versions:
> >
> > 1. Greg's version [2]  improves and clarifies the current documentation
> > without significantly changing the semantics. It also discusses operator
> > chaining, but gives more details.
> > 2. Gabor's proposal [3] aims to make the discussion of object reuse
> > independent of operator chaining which I think is a very good idea
> because
> > it is not transparent to the user when function chaining happens. Gabor
> > formulated four questions to answer what users can do with and expect
> from
> > objects that they received or emitted from a function. In order to make
> the
> > answers to these questions independent of function chaining and still
> keep
> > the contracts as defined by the current documentation, we have to default
> > to rather restrictive rules. For instance, functions must always emit new
> > object instances in case of disabled object reuse mode. These strict
> rules
> > would for example also require DataSourceFunctions to copy all records
> > which they receive from an InputFormat (see FLINK-3335). IMO, the strict
> > guarantees make the disableObjectReuse mode harder to use and reason
> about
> > than the enableObjectReuse mode whereas the opposite should be the case.
> >
> >
> >
> > I would like to suggest a third option. Similar as Gabor, I think the
> rules
> > should be independent of function chaining and I would like to break it
> > down into a handful of easy rules. However, I think we should loosen up
> the
> > guarantees for user functions under disableObjectReuse mode a bit.
> >
> > Right now, the documentation states that under enableObjectReuse mode,
> > input objects are not changed across functions calls. Hence users can
> > remember these objects across functions calls and their value will not
> > change. I propose to give this guarantee only within functions calls and
> > only for objects which are not emitted. Hence, this rule only applies for
> > functions that can consume multiple values through an iterator such as
> > GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse mode,
> > these functions are allowed to remember the values e.g., in a collection,
> > and their value will not change when the iterator is forwarded. Once the
> > function call returns, the values might change. Since  functions with
> > iterators cannot be directly chained, it will be safe to emit the same
> > object instance several times (hence FLINK-3335 would become invalid).
> >
> >
> >
> > The difference to the current guarantees is that input objects become
> > invalid after the function call returned. Since, the disableObjectReuse
> > mode was mainly introduced to allow for caching objects across iterator
> > calls within a GroupReduceFunction or CoGroupFunction (not across
> function
> > calls), I think this is a reasonable restriction.
> >
> >
> >
> > tl;dr;
> >
> > If we want to make the documentation of object reuse independent of
> > chaining we have to
> >
> > - EITHER, give tighter guarantees / be more restrictive than now and
> update
> > internals which might lead to performance regression. This would be
> in-line
> > with the current documentation but somewhat defeat the purpose of the
> > disabledObjectReuse mode, IMO.
> >
> > - OR, give weaker guarantees, which breaks with the current
> documentation,
> > but would not affect performance or be easier to follow for users, IMO.
> >
> >
> > Greg and Gabor, please correct me if I did not get your points right or
> > missed something.
> >
> > What do others think?
> >
> >
> > Fabian
> >
> >
> >
> > [1]
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> >
> > [2]
> >
> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
> >
> > [3]
> >
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Guarantees for object reuse modes and documentation

Till Rohrmann
Judging from our chaining condition

ds.getPushChainDriverClass() != null &&
!(pred instanceof NAryUnionPlanNode) &&    // first op after union is
stand-alone, because union is merged
!(pred instanceof BulkPartialSolutionPlanNode) &&    // partial
solution merges anyways
!(pred instanceof WorksetPlanNode) &&    // workset merges anyways
!(pred instanceof IterationPlanNode) && // cannot chain with iteration
heads currently
inConn.getShipStrategy() == ShipStrategyType.FORWARD &&
inConn.getLocalStrategy() == LocalStrategy.NONE &&
pred.getOutgoingChannels().size() == 1 &&
node.getParallelism() == pred.getParallelism() &&
node.getBroadcastInputs().isEmpty();

it is indeed hard for a newcomer to completely grasp when something is
chained and when not. Also having exceptions to the rule like in the case
of disableObjectReuse and sources which don’t respect that, makes it even
harder to reason about. What I’m wondering about is the actual performance
impact of enforcing our current guarantees in disableObjectReuse mode.
Furthermore, I’m not sure whether we have to tune the disableObjectReuse
mode for performance considering that this is the idea of the
enableObjectReuse mode.

Cheers,
Till


On Thu, Feb 18, 2016 at 9:58 AM, Fabian Hueske <[hidden email]> wrote:

> Thanks Matthias.
> Maybe I should clarify, that I do not want to change the guarantees for the
> enableObjectReuse mode, but for the disableObjectReuse mode.
> The rules for the enableObjectReuse mode should remain the same.
>
>
> 2016-02-18 9:37 GMT+01:00 Matthias J. Sax <[hidden email]>:
>
> > Hi,
> >
> > I like Fabian's proposal. The idea of object reuse is performance gain,
> > and we should not sacrifice this. Even more important is that the rules
> > are easy to understand!
> >
> > -Matthias
> >
> >
> > On 02/17/2016 06:17 PM, Fabian Hueske wrote:
> > > Hi,
> > >
> > >
> > >
> > > Flink's DataSet API features a configuration parameter called
> > > enableObjectReuse(). If activated, Flink's runtime will create fewer
> > > objects which results in better performance and lower garbage
> collection
> > > overhead. Depending on whether the configuration switch is enabled or
> > not,
> > > user functions may or may not perform certain operations on objects
> they
> > > receive from Flink or emit to Flink.
> > >
> > >
> > >
> > > At the moment, there are quite a few open issues and discussions going
> on
> > > about the object reuse mode, including the JIRA issues FLINK-3333,
> > > FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
> > >
> > >
> > >
> > > IMO, the most important issue is FLINK-3333 which is about improving
> the
> > > documentation of the object reuse mode. The current version [1] is
> > > ambiguous and includes details about operator chaining which are hard
> to
> > > understand and to reason about for users. Hence it is not very clear
> > which
> > > guarantees Flink gives for objects in user functions under which
> > > conditions. This documentation needs to be improved and I think this
> > should
> > > happen together with the 1.0 release.
> > >
> > >
> > >
> > > Greg and Gabor proposed two new versions:
> > >
> > > 1. Greg's version [2]  improves and clarifies the current documentation
> > > without significantly changing the semantics. It also discusses
> operator
> > > chaining, but gives more details.
> > > 2. Gabor's proposal [3] aims to make the discussion of object reuse
> > > independent of operator chaining which I think is a very good idea
> > because
> > > it is not transparent to the user when function chaining happens. Gabor
> > > formulated four questions to answer what users can do with and expect
> > from
> > > objects that they received or emitted from a function. In order to make
> > the
> > > answers to these questions independent of function chaining and still
> > keep
> > > the contracts as defined by the current documentation, we have to
> default
> > > to rather restrictive rules. For instance, functions must always emit
> new
> > > object instances in case of disabled object reuse mode. These strict
> > rules
> > > would for example also require DataSourceFunctions to copy all records
> > > which they receive from an InputFormat (see FLINK-3335). IMO, the
> strict
> > > guarantees make the disableObjectReuse mode harder to use and reason
> > about
> > > than the enableObjectReuse mode whereas the opposite should be the
> case.
> > >
> > >
> > >
> > > I would like to suggest a third option. Similar as Gabor, I think the
> > rules
> > > should be independent of function chaining and I would like to break it
> > > down into a handful of easy rules. However, I think we should loosen up
> > the
> > > guarantees for user functions under disableObjectReuse mode a bit.
> > >
> > > Right now, the documentation states that under enableObjectReuse mode,
> > > input objects are not changed across functions calls. Hence users can
> > > remember these objects across functions calls and their value will not
> > > change. I propose to give this guarantee only within functions calls
> and
> > > only for objects which are not emitted. Hence, this rule only applies
> for
> > > functions that can consume multiple values through an iterator such as
> > > GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse
> mode,
> > > these functions are allowed to remember the values e.g., in a
> collection,
> > > and their value will not change when the iterator is forwarded. Once
> the
> > > function call returns, the values might change. Since  functions with
> > > iterators cannot be directly chained, it will be safe to emit the same
> > > object instance several times (hence FLINK-3335 would become invalid).
> > >
> > >
> > >
> > > The difference to the current guarantees is that input objects become
> > > invalid after the function call returned. Since, the disableObjectReuse
> > > mode was mainly introduced to allow for caching objects across iterator
> > > calls within a GroupReduceFunction or CoGroupFunction (not across
> > function
> > > calls), I think this is a reasonable restriction.
> > >
> > >
> > >
> > > tl;dr;
> > >
> > > If we want to make the documentation of object reuse independent of
> > > chaining we have to
> > >
> > > - EITHER, give tighter guarantees / be more restrictive than now and
> > update
> > > internals which might lead to performance regression. This would be
> > in-line
> > > with the current documentation but somewhat defeat the purpose of the
> > > disabledObjectReuse mode, IMO.
> > >
> > > - OR, give weaker guarantees, which breaks with the current
> > documentation,
> > > but would not affect performance or be easier to follow for users, IMO.
> > >
> > >
> > > Greg and Gabor, please correct me if I did not get your points right or
> > > missed something.
> > >
> > > What do others think?
> > >
> > >
> > > Fabian
> > >
> > >
> > >
> > > [1]
> > >
> >
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> > >
> > > [2]
> > >
> >
> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
> > >
> > > [3]
> > >
> >
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Guarantees for object reuse modes and documentation

Greg Hogan
In reply to this post by Fabian Hueske-2
Hi Fabian,

I would only add to your citations Stephan's comment [1] concerning the
design, implementation, and use of object reuse.

I see two separate concerns addressed in code. First, as Stephan noted, for
certain classes deserialization is sufficiently expensive relative to
object creation and garbage collection that deserializing into new objects
is essentially free. If a user-defined function needs to copy inputs then
it is cheaper (potentially much cheaper) to perform this "copy" by
deserializing into a new object.

The second concern is defense against a user-defined function modifying
objects reused by the driver. For example, ChainedAllReduceDriver copies
intermediate outputs, and the non-reusing join iterators copy probe side
inputs. These extra copies are only made when object reuse is disabled.

Object reuse can have a noticeable impact on performance yet is unused in
the Flink libraries and examples. Since object reuse is configured on a job
any shared functions must be documented or verified to allow object reuse.
Would it be preferable to make object reuse an annotation(s) on
user-defined functions which would signal to the driver that these extra
copies could be avoided and to pass reusable objects to TypeSerializer?
This would allow users to explicitly configure object reuse, even when
operators are chained.

Greg

[1]
https://issues.apache.org/jira/browse/FLINK-3291?focusedCommentId=15144654

On Wed, Feb 17, 2016 at 12:17 PM, Fabian Hueske <[hidden email]> wrote:

> Hi,
>
>
>
> Flink's DataSet API features a configuration parameter called
> enableObjectReuse(). If activated, Flink's runtime will create fewer
> objects which results in better performance and lower garbage collection
> overhead. Depending on whether the configuration switch is enabled or not,
> user functions may or may not perform certain operations on objects they
> receive from Flink or emit to Flink.
>
>
>
> At the moment, there are quite a few open issues and discussions going on
> about the object reuse mode, including the JIRA issues FLINK-3333,
> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
>
>
>
> IMO, the most important issue is FLINK-3333 which is about improving the
> documentation of the object reuse mode. The current version [1] is
> ambiguous and includes details about operator chaining which are hard to
> understand and to reason about for users. Hence it is not very clear which
> guarantees Flink gives for objects in user functions under which
> conditions. This documentation needs to be improved and I think this should
> happen together with the 1.0 release.
>
>
>
> Greg and Gabor proposed two new versions:
>
> 1. Greg's version [2]  improves and clarifies the current documentation
> without significantly changing the semantics. It also discusses operator
> chaining, but gives more details.
> 2. Gabor's proposal [3] aims to make the discussion of object reuse
> independent of operator chaining which I think is a very good idea because
> it is not transparent to the user when function chaining happens. Gabor
> formulated four questions to answer what users can do with and expect from
> objects that they received or emitted from a function. In order to make the
> answers to these questions independent of function chaining and still keep
> the contracts as defined by the current documentation, we have to default
> to rather restrictive rules. For instance, functions must always emit new
> object instances in case of disabled object reuse mode. These strict rules
> would for example also require DataSourceFunctions to copy all records
> which they receive from an InputFormat (see FLINK-3335). IMO, the strict
> guarantees make the disableObjectReuse mode harder to use and reason about
> than the enableObjectReuse mode whereas the opposite should be the case.
>
>
>
> I would like to suggest a third option. Similar as Gabor, I think the rules
> should be independent of function chaining and I would like to break it
> down into a handful of easy rules. However, I think we should loosen up the
> guarantees for user functions under disableObjectReuse mode a bit.
>
> Right now, the documentation states that under enableObjectReuse mode,
> input objects are not changed across functions calls. Hence users can
> remember these objects across functions calls and their value will not
> change. I propose to give this guarantee only within functions calls and
> only for objects which are not emitted. Hence, this rule only applies for
> functions that can consume multiple values through an iterator such as
> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse mode,
> these functions are allowed to remember the values e.g., in a collection,
> and their value will not change when the iterator is forwarded. Once the
> function call returns, the values might change. Since  functions with
> iterators cannot be directly chained, it will be safe to emit the same
> object instance several times (hence FLINK-3335 would become invalid).
>
>
>
> The difference to the current guarantees is that input objects become
> invalid after the function call returned. Since, the disableObjectReuse
> mode was mainly introduced to allow for caching objects across iterator
> calls within a GroupReduceFunction or CoGroupFunction (not across function
> calls), I think this is a reasonable restriction.
>
>
>
> tl;dr;
>
> If we want to make the documentation of object reuse independent of
> chaining we have to
>
> - EITHER, give tighter guarantees / be more restrictive than now and update
> internals which might lead to performance regression. This would be in-line
> with the current documentation but somewhat defeat the purpose of the
> disabledObjectReuse mode, IMO.
>
> - OR, give weaker guarantees, which breaks with the current documentation,
> but would not affect performance or be easier to follow for users, IMO.
>
>
> Greg and Gabor, please correct me if I did not get your points right or
> missed something.
>
> What do others think?
>
>
> Fabian
>
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
>
> [2]
> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
>
> [3]
>
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg
>
Reply | Threaded
Open this post in threaded view
|

RE: Guarantees for object reuse modes and documentation

Ken Krugler
In reply to this post by Fabian Hueske-2
Not sure how useful this is, but we'd run into similar issues with Cascading over the years.

This wasn't an issue for input data, as Cascading "locks" the Tuple such that attempts to modify it will fail.

And in general Hadoop always re-uses the data container being passed to operations, so you quickly learn to not cache those :)

When trying to re-use a Tuple as the output in an operation, things get a bit more complicated.

If the Tuple only contains primitive types, then there's no issue as the (effectively) shallow copy created by the execution platform doesn't create a problem.

If the Tuple contains an object (e.g. a nested Tuple) then there were situations where a deep copy would need to be made before passing the Tuple to the operation's output collector.

For example, if the next (chained) operation was a map-side aggregator, then a shallow copy of the Tuple would be cached. If there's a non-primitive object then changes to this in the upstream operation obviously bork the cached data.

Net-net is that it we wanted a way to find out, from inside an operation, whether we needed to make a deep copy of the output Tuple. But that doesn't exist (yet), so we have some utility code to check if a deep copy is needed (non-primitive types), and if so then it auto-clones the Tuple. Which isn't very efficient, but for most of our workflows we only have primitive types.

-- Ken

> From: Fabian Hueske
> Sent: February 17, 2016 9:17:27am PST
> To: [hidden email]
> Subject: Guarantees for object reuse modes and documentation
>
> Hi,
>
>
>
> Flink's DataSet API features a configuration parameter called
> enableObjectReuse(). If activated, Flink's runtime will create fewer
> objects which results in better performance and lower garbage collection
> overhead. Depending on whether the configuration switch is enabled or not,
> user functions may or may not perform certain operations on objects they
> receive from Flink or emit to Flink.
>
>
>
> At the moment, there are quite a few open issues and discussions going on
> about the object reuse mode, including the JIRA issues FLINK-3333,
> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
>
>
>
> IMO, the most important issue is FLINK-3333 which is about improving the
> documentation of the object reuse mode. The current version [1] is
> ambiguous and includes details about operator chaining which are hard to
> understand and to reason about for users. Hence it is not very clear which
> guarantees Flink gives for objects in user functions under which
> conditions. This documentation needs to be improved and I think this should
> happen together with the 1.0 release.
>
>
>
> Greg and Gabor proposed two new versions:
>
> 1. Greg's version [2]  improves and clarifies the current documentation
> without significantly changing the semantics. It also discusses operator
> chaining, but gives more details.
> 2. Gabor's proposal [3] aims to make the discussion of object reuse
> independent of operator chaining which I think is a very good idea because
> it is not transparent to the user when function chaining happens. Gabor
> formulated four questions to answer what users can do with and expect from
> objects that they received or emitted from a function. In order to make the
> answers to these questions independent of function chaining and still keep
> the contracts as defined by the current documentation, we have to default
> to rather restrictive rules. For instance, functions must always emit new
> object instances in case of disabled object reuse mode. These strict rules
> would for example also require DataSourceFunctions to copy all records
> which they receive from an InputFormat (see FLINK-3335). IMO, the strict
> guarantees make the disableObjectReuse mode harder to use and reason about
> than the enableObjectReuse mode whereas the opposite should be the case.
>
>
>
> I would like to suggest a third option. Similar as Gabor, I think the rules
> should be independent of function chaining and I would like to break it
> down into a handful of easy rules. However, I think we should loosen up the
> guarantees for user functions under disableObjectReuse mode a bit.
>
> Right now, the documentation states that under enableObjectReuse mode,
> input objects are not changed across functions calls. Hence users can
> remember these objects across functions calls and their value will not
> change. I propose to give this guarantee only within functions calls and
> only for objects which are not emitted. Hence, this rule only applies for
> functions that can consume multiple values through an iterator such as
> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse mode,
> these functions are allowed to remember the values e.g., in a collection,
> and their value will not change when the iterator is forwarded. Once the
> function call returns, the values might change. Since  functions with
> iterators cannot be directly chained, it will be safe to emit the same
> object instance several times (hence FLINK-3335 would become invalid).
>
>
>
> The difference to the current guarantees is that input objects become
> invalid after the function call returned. Since, the disableObjectReuse
> mode was mainly introduced to allow for caching objects across iterator
> calls within a GroupReduceFunction or CoGroupFunction (not across function
> calls), I think this is a reasonable restriction.
>
>
>
> tl;dr;
>
> If we want to make the documentation of object reuse independent of
> chaining we have to
>
> - EITHER, give tighter guarantees / be more restrictive than now and update
> internals which might lead to performance regression. This would be in-line
> with the current documentation but somewhat defeat the purpose of the
> disabledObjectReuse mode, IMO.
>
> - OR, give weaker guarantees, which breaks with the current documentation,
> but would not affect performance or be easier to follow for users, IMO.
>
>
> Greg and Gabor, please correct me if I did not get your points right or
> missed something.
>
> What do others think?
>
>
> Fabian
>
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
>
> [2]
> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
>
> [3]
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Reply | Threaded
Open this post in threaded view
|

Re: Guarantees for object reuse modes and documentation

Gábor Gévay
Thanks, Ken! I was wondering how other systems handle these issues.

Fortunately, the deep copy - shallow copy problem doesn't arise in
Flink: when we copy an object, it is always a deep copy (at least, I
hope so :)).

Best,
Gábor



2016-02-19 22:29 GMT+01:00 Ken Krugler <[hidden email]>:

> Not sure how useful this is, but we'd run into similar issues with Cascading over the years.
>
> This wasn't an issue for input data, as Cascading "locks" the Tuple such that attempts to modify it will fail.
>
> And in general Hadoop always re-uses the data container being passed to operations, so you quickly learn to not cache those :)
>
> When trying to re-use a Tuple as the output in an operation, things get a bit more complicated.
>
> If the Tuple only contains primitive types, then there's no issue as the (effectively) shallow copy created by the execution platform doesn't create a problem.
>
> If the Tuple contains an object (e.g. a nested Tuple) then there were situations where a deep copy would need to be made before passing the Tuple to the operation's output collector.
>
> For example, if the next (chained) operation was a map-side aggregator, then a shallow copy of the Tuple would be cached. If there's a non-primitive object then changes to this in the upstream operation obviously bork the cached data.
>
> Net-net is that it we wanted a way to find out, from inside an operation, whether we needed to make a deep copy of the output Tuple. But that doesn't exist (yet), so we have some utility code to check if a deep copy is needed (non-primitive types), and if so then it auto-clones the Tuple. Which isn't very efficient, but for most of our workflows we only have primitive types.
>
> -- Ken
>
>> From: Fabian Hueske
>> Sent: February 17, 2016 9:17:27am PST
>> To: [hidden email]
>> Subject: Guarantees for object reuse modes and documentation
>>
>> Hi,
>>
>>
>>
>> Flink's DataSet API features a configuration parameter called
>> enableObjectReuse(). If activated, Flink's runtime will create fewer
>> objects which results in better performance and lower garbage collection
>> overhead. Depending on whether the configuration switch is enabled or not,
>> user functions may or may not perform certain operations on objects they
>> receive from Flink or emit to Flink.
>>
>>
>>
>> At the moment, there are quite a few open issues and discussions going on
>> about the object reuse mode, including the JIRA issues FLINK-3333,
>> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
>>
>>
>>
>> IMO, the most important issue is FLINK-3333 which is about improving the
>> documentation of the object reuse mode. The current version [1] is
>> ambiguous and includes details about operator chaining which are hard to
>> understand and to reason about for users. Hence it is not very clear which
>> guarantees Flink gives for objects in user functions under which
>> conditions. This documentation needs to be improved and I think this should
>> happen together with the 1.0 release.
>>
>>
>>
>> Greg and Gabor proposed two new versions:
>>
>> 1. Greg's version [2]  improves and clarifies the current documentation
>> without significantly changing the semantics. It also discusses operator
>> chaining, but gives more details.
>> 2. Gabor's proposal [3] aims to make the discussion of object reuse
>> independent of operator chaining which I think is a very good idea because
>> it is not transparent to the user when function chaining happens. Gabor
>> formulated four questions to answer what users can do with and expect from
>> objects that they received or emitted from a function. In order to make the
>> answers to these questions independent of function chaining and still keep
>> the contracts as defined by the current documentation, we have to default
>> to rather restrictive rules. For instance, functions must always emit new
>> object instances in case of disabled object reuse mode. These strict rules
>> would for example also require DataSourceFunctions to copy all records
>> which they receive from an InputFormat (see FLINK-3335). IMO, the strict
>> guarantees make the disableObjectReuse mode harder to use and reason about
>> than the enableObjectReuse mode whereas the opposite should be the case.
>>
>>
>>
>> I would like to suggest a third option. Similar as Gabor, I think the rules
>> should be independent of function chaining and I would like to break it
>> down into a handful of easy rules. However, I think we should loosen up the
>> guarantees for user functions under disableObjectReuse mode a bit.
>>
>> Right now, the documentation states that under enableObjectReuse mode,
>> input objects are not changed across functions calls. Hence users can
>> remember these objects across functions calls and their value will not
>> change. I propose to give this guarantee only within functions calls and
>> only for objects which are not emitted. Hence, this rule only applies for
>> functions that can consume multiple values through an iterator such as
>> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse mode,
>> these functions are allowed to remember the values e.g., in a collection,
>> and their value will not change when the iterator is forwarded. Once the
>> function call returns, the values might change. Since  functions with
>> iterators cannot be directly chained, it will be safe to emit the same
>> object instance several times (hence FLINK-3335 would become invalid).
>>
>>
>>
>> The difference to the current guarantees is that input objects become
>> invalid after the function call returned. Since, the disableObjectReuse
>> mode was mainly introduced to allow for caching objects across iterator
>> calls within a GroupReduceFunction or CoGroupFunction (not across function
>> calls), I think this is a reasonable restriction.
>>
>>
>>
>> tl;dr;
>>
>> If we want to make the documentation of object reuse independent of
>> chaining we have to
>>
>> - EITHER, give tighter guarantees / be more restrictive than now and update
>> internals which might lead to performance regression. This would be in-line
>> with the current documentation but somewhat defeat the purpose of the
>> disabledObjectReuse mode, IMO.
>>
>> - OR, give weaker guarantees, which breaks with the current documentation,
>> but would not affect performance or be easier to follow for users, IMO.
>>
>>
>> Greg and Gabor, please correct me if I did not get your points right or
>> missed something.
>>
>> What do others think?
>>
>>
>> Fabian
>>
>>
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
>>
>> [2]
>> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
>>
>> [3]
>> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>
>
> --------------------------
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Guarantees for object reuse modes and documentation

Fabian Hueske-2
Hi everybody,

thanks for your input.

I sketched a proposal for updated object-reuse semantics and documentation,
based on Gabor's proposal (1), Greg's input, and the changed semantics that
I discussed earlier in this thread.

-->
https://docs.google.com/document/d/1jpPr2UuWlqq1iIDIo_1kmPL9QjA-sXAC9wkj-hE4PAc/edit#

Looking forward to your comments.

Fabian
(1)
https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit

2016-02-20 13:04 GMT+01:00 Gábor Gévay <[hidden email]>:

> Thanks, Ken! I was wondering how other systems handle these issues.
>
> Fortunately, the deep copy - shallow copy problem doesn't arise in
> Flink: when we copy an object, it is always a deep copy (at least, I
> hope so :)).
>
> Best,
> Gábor
>
>
>
> 2016-02-19 22:29 GMT+01:00 Ken Krugler <[hidden email]>:
> > Not sure how useful this is, but we'd run into similar issues with
> Cascading over the years.
> >
> > This wasn't an issue for input data, as Cascading "locks" the Tuple such
> that attempts to modify it will fail.
> >
> > And in general Hadoop always re-uses the data container being passed to
> operations, so you quickly learn to not cache those :)
> >
> > When trying to re-use a Tuple as the output in an operation, things get
> a bit more complicated.
> >
> > If the Tuple only contains primitive types, then there's no issue as the
> (effectively) shallow copy created by the execution platform doesn't create
> a problem.
> >
> > If the Tuple contains an object (e.g. a nested Tuple) then there were
> situations where a deep copy would need to be made before passing the Tuple
> to the operation's output collector.
> >
> > For example, if the next (chained) operation was a map-side aggregator,
> then a shallow copy of the Tuple would be cached. If there's a
> non-primitive object then changes to this in the upstream operation
> obviously bork the cached data.
> >
> > Net-net is that it we wanted a way to find out, from inside an
> operation, whether we needed to make a deep copy of the output Tuple. But
> that doesn't exist (yet), so we have some utility code to check if a deep
> copy is needed (non-primitive types), and if so then it auto-clones the
> Tuple. Which isn't very efficient, but for most of our workflows we only
> have primitive types.
> >
> > -- Ken
> >
> >> From: Fabian Hueske
> >> Sent: February 17, 2016 9:17:27am PST
> >> To: [hidden email]
> >> Subject: Guarantees for object reuse modes and documentation
> >>
> >> Hi,
> >>
> >>
> >>
> >> Flink's DataSet API features a configuration parameter called
> >> enableObjectReuse(). If activated, Flink's runtime will create fewer
> >> objects which results in better performance and lower garbage collection
> >> overhead. Depending on whether the configuration switch is enabled or
> not,
> >> user functions may or may not perform certain operations on objects they
> >> receive from Flink or emit to Flink.
> >>
> >>
> >>
> >> At the moment, there are quite a few open issues and discussions going
> on
> >> about the object reuse mode, including the JIRA issues FLINK-3333,
> >> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
> >>
> >>
> >>
> >> IMO, the most important issue is FLINK-3333 which is about improving the
> >> documentation of the object reuse mode. The current version [1] is
> >> ambiguous and includes details about operator chaining which are hard to
> >> understand and to reason about for users. Hence it is not very clear
> which
> >> guarantees Flink gives for objects in user functions under which
> >> conditions. This documentation needs to be improved and I think this
> should
> >> happen together with the 1.0 release.
> >>
> >>
> >>
> >> Greg and Gabor proposed two new versions:
> >>
> >> 1. Greg's version [2]  improves and clarifies the current documentation
> >> without significantly changing the semantics. It also discusses operator
> >> chaining, but gives more details.
> >> 2. Gabor's proposal [3] aims to make the discussion of object reuse
> >> independent of operator chaining which I think is a very good idea
> because
> >> it is not transparent to the user when function chaining happens. Gabor
> >> formulated four questions to answer what users can do with and expect
> from
> >> objects that they received or emitted from a function. In order to make
> the
> >> answers to these questions independent of function chaining and still
> keep
> >> the contracts as defined by the current documentation, we have to
> default
> >> to rather restrictive rules. For instance, functions must always emit
> new
> >> object instances in case of disabled object reuse mode. These strict
> rules
> >> would for example also require DataSourceFunctions to copy all records
> >> which they receive from an InputFormat (see FLINK-3335). IMO, the strict
> >> guarantees make the disableObjectReuse mode harder to use and reason
> about
> >> than the enableObjectReuse mode whereas the opposite should be the case.
> >>
> >>
> >>
> >> I would like to suggest a third option. Similar as Gabor, I think the
> rules
> >> should be independent of function chaining and I would like to break it
> >> down into a handful of easy rules. However, I think we should loosen up
> the
> >> guarantees for user functions under disableObjectReuse mode a bit.
> >>
> >> Right now, the documentation states that under enableObjectReuse mode,
> >> input objects are not changed across functions calls. Hence users can
> >> remember these objects across functions calls and their value will not
> >> change. I propose to give this guarantee only within functions calls and
> >> only for objects which are not emitted. Hence, this rule only applies
> for
> >> functions that can consume multiple values through an iterator such as
> >> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse
> mode,
> >> these functions are allowed to remember the values e.g., in a
> collection,
> >> and their value will not change when the iterator is forwarded. Once the
> >> function call returns, the values might change. Since  functions with
> >> iterators cannot be directly chained, it will be safe to emit the same
> >> object instance several times (hence FLINK-3335 would become invalid).
> >>
> >>
> >>
> >> The difference to the current guarantees is that input objects become
> >> invalid after the function call returned. Since, the disableObjectReuse
> >> mode was mainly introduced to allow for caching objects across iterator
> >> calls within a GroupReduceFunction or CoGroupFunction (not across
> function
> >> calls), I think this is a reasonable restriction.
> >>
> >>
> >>
> >> tl;dr;
> >>
> >> If we want to make the documentation of object reuse independent of
> >> chaining we have to
> >>
> >> - EITHER, give tighter guarantees / be more restrictive than now and
> update
> >> internals which might lead to performance regression. This would be
> in-line
> >> with the current documentation but somewhat defeat the purpose of the
> >> disabledObjectReuse mode, IMO.
> >>
> >> - OR, give weaker guarantees, which breaks with the current
> documentation,
> >> but would not affect performance or be easier to follow for users, IMO.
> >>
> >>
> >> Greg and Gabor, please correct me if I did not get your points right or
> >> missed something.
> >>
> >> What do others think?
> >>
> >>
> >> Fabian
> >>
> >>
> >>
> >> [1]
> >>
> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
> >>
> >> [2]
> >>
> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
> >>
> >> [3]
> >>
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg
> >
> > --------------------------
> > Ken Krugler
> > +1 530-210-6378
> > http://www.scaleunlimited.com
> > custom big data solutions & training
> > Hadoop, Cascading, Cassandra & Solr
> >
> >
> >
> >
> >
> > --------------------------
> > Ken Krugler
> > +1 530-210-6378
> > http://www.scaleunlimited.com
> > custom big data solutions & training
> > Hadoop, Cascading, Cassandra & Solr
> >
> >
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Guarantees for object reuse modes and documentation

Fabian Hueske-2
Regarding the scope of the object-reuse setting, I agree with Greg.
It would be very nice if we could specify the object-reuse mode for each
user function.

Greg, do you want to open a JIRA for that such that we can continue the
discussion there?

2016-02-24 12:07 GMT+01:00 Fabian Hueske <[hidden email]>:

> Hi everybody,
>
> thanks for your input.
>
> I sketched a proposal for updated object-reuse semantics and
> documentation, based on Gabor's proposal (1), Greg's input, and the changed
> semantics that I discussed earlier in this thread.
>
> -->
> https://docs.google.com/document/d/1jpPr2UuWlqq1iIDIo_1kmPL9QjA-sXAC9wkj-hE4PAc/edit#
>
> Looking forward to your comments.
>
> Fabian
> (1)
> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit
>
>
> 2016-02-20 13:04 GMT+01:00 Gábor Gévay <[hidden email]>:
>
>> Thanks, Ken! I was wondering how other systems handle these issues.
>>
>> Fortunately, the deep copy - shallow copy problem doesn't arise in
>> Flink: when we copy an object, it is always a deep copy (at least, I
>> hope so :)).
>>
>> Best,
>> Gábor
>>
>>
>>
>> 2016-02-19 22:29 GMT+01:00 Ken Krugler <[hidden email]>:
>> > Not sure how useful this is, but we'd run into similar issues with
>> Cascading over the years.
>> >
>> > This wasn't an issue for input data, as Cascading "locks" the Tuple
>> such that attempts to modify it will fail.
>> >
>> > And in general Hadoop always re-uses the data container being passed to
>> operations, so you quickly learn to not cache those :)
>> >
>> > When trying to re-use a Tuple as the output in an operation, things get
>> a bit more complicated.
>> >
>> > If the Tuple only contains primitive types, then there's no issue as
>> the (effectively) shallow copy created by the execution platform doesn't
>> create a problem.
>> >
>> > If the Tuple contains an object (e.g. a nested Tuple) then there were
>> situations where a deep copy would need to be made before passing the Tuple
>> to the operation's output collector.
>> >
>> > For example, if the next (chained) operation was a map-side aggregator,
>> then a shallow copy of the Tuple would be cached. If there's a
>> non-primitive object then changes to this in the upstream operation
>> obviously bork the cached data.
>> >
>> > Net-net is that it we wanted a way to find out, from inside an
>> operation, whether we needed to make a deep copy of the output Tuple. But
>> that doesn't exist (yet), so we have some utility code to check if a deep
>> copy is needed (non-primitive types), and if so then it auto-clones the
>> Tuple. Which isn't very efficient, but for most of our workflows we only
>> have primitive types.
>> >
>> > -- Ken
>> >
>> >> From: Fabian Hueske
>> >> Sent: February 17, 2016 9:17:27am PST
>> >> To: [hidden email]
>> >> Subject: Guarantees for object reuse modes and documentation
>> >>
>> >> Hi,
>> >>
>> >>
>> >>
>> >> Flink's DataSet API features a configuration parameter called
>> >> enableObjectReuse(). If activated, Flink's runtime will create fewer
>> >> objects which results in better performance and lower garbage
>> collection
>> >> overhead. Depending on whether the configuration switch is enabled or
>> not,
>> >> user functions may or may not perform certain operations on objects
>> they
>> >> receive from Flink or emit to Flink.
>> >>
>> >>
>> >>
>> >> At the moment, there are quite a few open issues and discussions going
>> on
>> >> about the object reuse mode, including the JIRA issues FLINK-3333,
>> >> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
>> >>
>> >>
>> >>
>> >> IMO, the most important issue is FLINK-3333 which is about improving
>> the
>> >> documentation of the object reuse mode. The current version [1] is
>> >> ambiguous and includes details about operator chaining which are hard
>> to
>> >> understand and to reason about for users. Hence it is not very clear
>> which
>> >> guarantees Flink gives for objects in user functions under which
>> >> conditions. This documentation needs to be improved and I think this
>> should
>> >> happen together with the 1.0 release.
>> >>
>> >>
>> >>
>> >> Greg and Gabor proposed two new versions:
>> >>
>> >> 1. Greg's version [2]  improves and clarifies the current documentation
>> >> without significantly changing the semantics. It also discusses
>> operator
>> >> chaining, but gives more details.
>> >> 2. Gabor's proposal [3] aims to make the discussion of object reuse
>> >> independent of operator chaining which I think is a very good idea
>> because
>> >> it is not transparent to the user when function chaining happens. Gabor
>> >> formulated four questions to answer what users can do with and expect
>> from
>> >> objects that they received or emitted from a function. In order to
>> make the
>> >> answers to these questions independent of function chaining and still
>> keep
>> >> the contracts as defined by the current documentation, we have to
>> default
>> >> to rather restrictive rules. For instance, functions must always emit
>> new
>> >> object instances in case of disabled object reuse mode. These strict
>> rules
>> >> would for example also require DataSourceFunctions to copy all records
>> >> which they receive from an InputFormat (see FLINK-3335). IMO, the
>> strict
>> >> guarantees make the disableObjectReuse mode harder to use and reason
>> about
>> >> than the enableObjectReuse mode whereas the opposite should be the
>> case.
>> >>
>> >>
>> >>
>> >> I would like to suggest a third option. Similar as Gabor, I think the
>> rules
>> >> should be independent of function chaining and I would like to break it
>> >> down into a handful of easy rules. However, I think we should loosen
>> up the
>> >> guarantees for user functions under disableObjectReuse mode a bit.
>> >>
>> >> Right now, the documentation states that under enableObjectReuse mode,
>> >> input objects are not changed across functions calls. Hence users can
>> >> remember these objects across functions calls and their value will not
>> >> change. I propose to give this guarantee only within functions calls
>> and
>> >> only for objects which are not emitted. Hence, this rule only applies
>> for
>> >> functions that can consume multiple values through an iterator such as
>> >> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse
>> mode,
>> >> these functions are allowed to remember the values e.g., in a
>> collection,
>> >> and their value will not change when the iterator is forwarded. Once
>> the
>> >> function call returns, the values might change. Since  functions with
>> >> iterators cannot be directly chained, it will be safe to emit the same
>> >> object instance several times (hence FLINK-3335 would become invalid).
>> >>
>> >>
>> >>
>> >> The difference to the current guarantees is that input objects become
>> >> invalid after the function call returned. Since, the disableObjectReuse
>> >> mode was mainly introduced to allow for caching objects across iterator
>> >> calls within a GroupReduceFunction or CoGroupFunction (not across
>> function
>> >> calls), I think this is a reasonable restriction.
>> >>
>> >>
>> >>
>> >> tl;dr;
>> >>
>> >> If we want to make the documentation of object reuse independent of
>> >> chaining we have to
>> >>
>> >> - EITHER, give tighter guarantees / be more restrictive than now and
>> update
>> >> internals which might lead to performance regression. This would be
>> in-line
>> >> with the current documentation but somewhat defeat the purpose of the
>> >> disabledObjectReuse mode, IMO.
>> >>
>> >> - OR, give weaker guarantees, which breaks with the current
>> documentation,
>> >> but would not affect performance or be easier to follow for users, IMO.
>> >>
>> >>
>> >> Greg and Gabor, please correct me if I did not get your points right or
>> >> missed something.
>> >>
>> >> What do others think?
>> >>
>> >>
>> >> Fabian
>> >>
>> >>
>> >>
>> >> [1]
>> >>
>> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
>> >>
>> >> [2]
>> >>
>> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
>> >>
>> >> [3]
>> >>
>> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg
>> >
>> > --------------------------
>> > Ken Krugler
>> > +1 530-210-6378
>> > http://www.scaleunlimited.com
>> > custom big data solutions & training
>> > Hadoop, Cascading, Cassandra & Solr
>> >
>> >
>> >
>> >
>> >
>> > --------------------------
>> > Ken Krugler
>> > +1 530-210-6378
>> > http://www.scaleunlimited.com
>> > custom big data solutions & training
>> > Hadoop, Cascading, Cassandra & Solr
>> >
>> >
>> >
>> >
>> >
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Guarantees for object reuse modes and documentation

Fabian Hueske-2
Gabor and Greg gave some good comments on the proposal.

If there is no more feedback, I'll go ahead and open a PR to update the
documentation tomorrow.

Thanks, Fabian

2016-02-24 12:24 GMT+01:00 Fabian Hueske <[hidden email]>:

> Regarding the scope of the object-reuse setting, I agree with Greg.
> It would be very nice if we could specify the object-reuse mode for each
> user function.
>
> Greg, do you want to open a JIRA for that such that we can continue the
> discussion there?
>
> 2016-02-24 12:07 GMT+01:00 Fabian Hueske <[hidden email]>:
>
>> Hi everybody,
>>
>> thanks for your input.
>>
>> I sketched a proposal for updated object-reuse semantics and
>> documentation, based on Gabor's proposal (1), Greg's input, and the changed
>> semantics that I discussed earlier in this thread.
>>
>> -->
>> https://docs.google.com/document/d/1jpPr2UuWlqq1iIDIo_1kmPL9QjA-sXAC9wkj-hE4PAc/edit#
>>
>> Looking forward to your comments.
>>
>> Fabian
>> (1)
>> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg/edit
>>
>>
>> 2016-02-20 13:04 GMT+01:00 Gábor Gévay <[hidden email]>:
>>
>>> Thanks, Ken! I was wondering how other systems handle these issues.
>>>
>>> Fortunately, the deep copy - shallow copy problem doesn't arise in
>>> Flink: when we copy an object, it is always a deep copy (at least, I
>>> hope so :)).
>>>
>>> Best,
>>> Gábor
>>>
>>>
>>>
>>> 2016-02-19 22:29 GMT+01:00 Ken Krugler <[hidden email]>:
>>> > Not sure how useful this is, but we'd run into similar issues with
>>> Cascading over the years.
>>> >
>>> > This wasn't an issue for input data, as Cascading "locks" the Tuple
>>> such that attempts to modify it will fail.
>>> >
>>> > And in general Hadoop always re-uses the data container being passed
>>> to operations, so you quickly learn to not cache those :)
>>> >
>>> > When trying to re-use a Tuple as the output in an operation, things
>>> get a bit more complicated.
>>> >
>>> > If the Tuple only contains primitive types, then there's no issue as
>>> the (effectively) shallow copy created by the execution platform doesn't
>>> create a problem.
>>> >
>>> > If the Tuple contains an object (e.g. a nested Tuple) then there were
>>> situations where a deep copy would need to be made before passing the Tuple
>>> to the operation's output collector.
>>> >
>>> > For example, if the next (chained) operation was a map-side
>>> aggregator, then a shallow copy of the Tuple would be cached. If there's a
>>> non-primitive object then changes to this in the upstream operation
>>> obviously bork the cached data.
>>> >
>>> > Net-net is that it we wanted a way to find out, from inside an
>>> operation, whether we needed to make a deep copy of the output Tuple. But
>>> that doesn't exist (yet), so we have some utility code to check if a deep
>>> copy is needed (non-primitive types), and if so then it auto-clones the
>>> Tuple. Which isn't very efficient, but for most of our workflows we only
>>> have primitive types.
>>> >
>>> > -- Ken
>>> >
>>> >> From: Fabian Hueske
>>> >> Sent: February 17, 2016 9:17:27am PST
>>> >> To: [hidden email]
>>> >> Subject: Guarantees for object reuse modes and documentation
>>> >>
>>> >> Hi,
>>> >>
>>> >>
>>> >>
>>> >> Flink's DataSet API features a configuration parameter called
>>> >> enableObjectReuse(). If activated, Flink's runtime will create fewer
>>> >> objects which results in better performance and lower garbage
>>> collection
>>> >> overhead. Depending on whether the configuration switch is enabled or
>>> not,
>>> >> user functions may or may not perform certain operations on objects
>>> they
>>> >> receive from Flink or emit to Flink.
>>> >>
>>> >>
>>> >>
>>> >> At the moment, there are quite a few open issues and discussions
>>> going on
>>> >> about the object reuse mode, including the JIRA issues FLINK-3333,
>>> >> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
>>> >>
>>> >>
>>> >>
>>> >> IMO, the most important issue is FLINK-3333 which is about improving
>>> the
>>> >> documentation of the object reuse mode. The current version [1] is
>>> >> ambiguous and includes details about operator chaining which are hard
>>> to
>>> >> understand and to reason about for users. Hence it is not very clear
>>> which
>>> >> guarantees Flink gives for objects in user functions under which
>>> >> conditions. This documentation needs to be improved and I think this
>>> should
>>> >> happen together with the 1.0 release.
>>> >>
>>> >>
>>> >>
>>> >> Greg and Gabor proposed two new versions:
>>> >>
>>> >> 1. Greg's version [2]  improves and clarifies the current
>>> documentation
>>> >> without significantly changing the semantics. It also discusses
>>> operator
>>> >> chaining, but gives more details.
>>> >> 2. Gabor's proposal [3] aims to make the discussion of object reuse
>>> >> independent of operator chaining which I think is a very good idea
>>> because
>>> >> it is not transparent to the user when function chaining happens.
>>> Gabor
>>> >> formulated four questions to answer what users can do with and expect
>>> from
>>> >> objects that they received or emitted from a function. In order to
>>> make the
>>> >> answers to these questions independent of function chaining and still
>>> keep
>>> >> the contracts as defined by the current documentation, we have to
>>> default
>>> >> to rather restrictive rules. For instance, functions must always emit
>>> new
>>> >> object instances in case of disabled object reuse mode. These strict
>>> rules
>>> >> would for example also require DataSourceFunctions to copy all records
>>> >> which they receive from an InputFormat (see FLINK-3335). IMO, the
>>> strict
>>> >> guarantees make the disableObjectReuse mode harder to use and reason
>>> about
>>> >> than the enableObjectReuse mode whereas the opposite should be the
>>> case.
>>> >>
>>> >>
>>> >>
>>> >> I would like to suggest a third option. Similar as Gabor, I think the
>>> rules
>>> >> should be independent of function chaining and I would like to break
>>> it
>>> >> down into a handful of easy rules. However, I think we should loosen
>>> up the
>>> >> guarantees for user functions under disableObjectReuse mode a bit.
>>> >>
>>> >> Right now, the documentation states that under enableObjectReuse mode,
>>> >> input objects are not changed across functions calls. Hence users can
>>> >> remember these objects across functions calls and their value will not
>>> >> change. I propose to give this guarantee only within functions calls
>>> and
>>> >> only for objects which are not emitted. Hence, this rule only applies
>>> for
>>> >> functions that can consume multiple values through an iterator such as
>>> >> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse
>>> mode,
>>> >> these functions are allowed to remember the values e.g., in a
>>> collection,
>>> >> and their value will not change when the iterator is forwarded. Once
>>> the
>>> >> function call returns, the values might change. Since  functions with
>>> >> iterators cannot be directly chained, it will be safe to emit the same
>>> >> object instance several times (hence FLINK-3335 would become invalid).
>>> >>
>>> >>
>>> >>
>>> >> The difference to the current guarantees is that input objects become
>>> >> invalid after the function call returned. Since, the
>>> disableObjectReuse
>>> >> mode was mainly introduced to allow for caching objects across
>>> iterator
>>> >> calls within a GroupReduceFunction or CoGroupFunction (not across
>>> function
>>> >> calls), I think this is a reasonable restriction.
>>> >>
>>> >>
>>> >>
>>> >> tl;dr;
>>> >>
>>> >> If we want to make the documentation of object reuse independent of
>>> >> chaining we have to
>>> >>
>>> >> - EITHER, give tighter guarantees / be more restrictive than now and
>>> update
>>> >> internals which might lead to performance regression. This would be
>>> in-line
>>> >> with the current documentation but somewhat defeat the purpose of the
>>> >> disabledObjectReuse mode, IMO.
>>> >>
>>> >> - OR, give weaker guarantees, which breaks with the current
>>> documentation,
>>> >> but would not affect performance or be easier to follow for users,
>>> IMO.
>>> >>
>>> >>
>>> >> Greg and Gabor, please correct me if I did not get your points right
>>> or
>>> >> missed something.
>>> >>
>>> >> What do others think?
>>> >>
>>> >>
>>> >> Fabian
>>> >>
>>> >>
>>> >>
>>> >> [1]
>>> >>
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
>>> >>
>>> >> [2]
>>> >>
>>> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
>>> >>
>>> >> [3]
>>> >>
>>> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg
>>> >
>>> > --------------------------
>>> > Ken Krugler
>>> > +1 530-210-6378
>>> > http://www.scaleunlimited.com
>>> > custom big data solutions & training
>>> > Hadoop, Cascading, Cassandra & Solr
>>> >
>>> >
>>> >
>>> >
>>> >
>>> > --------------------------
>>> > Ken Krugler
>>> > +1 530-210-6378
>>> > http://www.scaleunlimited.com
>>> > custom big data solutions & training
>>> > Hadoop, Cascading, Cassandra & Solr
>>> >
>>> >
>>> >
>>> >
>>> >
>>>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

RE: Guarantees for object reuse modes and documentation

Ken Krugler
In reply to this post by Gábor Gévay
Hi Gábor,

When object re-use is enabled, what happens when maps are chained?

In Cascading, when running in local mode, the tuple that one map operation outputs is immediately used as the input for the next map.

The upstream map isn't called with another input tuple until the pipelining is done, which prevents a problem with output tuple re-use.

Is this the same model used by Flink?

Thanks for clarifying,

-- Ken

> From: Gábor Gévay
> Sent: February 20, 2016 4:04:09am PST
> To: [hidden email]
> Subject: Re: Guarantees for object reuse modes and documentation
>
> Thanks, Ken! I was wondering how other systems handle these issues.
>
> Fortunately, the deep copy - shallow copy problem doesn't arise in
> Flink: when we copy an object, it is always a deep copy (at least, I
> hope so :)).
>
> Best,
> Gábor
>
>
>
> 2016-02-19 22:29 GMT+01:00 Ken Krugler <[hidden email]>:
>> Not sure how useful this is, but we'd run into similar issues with Cascading over the years.
>>
>> This wasn't an issue for input data, as Cascading "locks" the Tuple such that attempts to modify it will fail.
>>
>> And in general Hadoop always re-uses the data container being passed to operations, so you quickly learn to not cache those :)
>>
>> When trying to re-use a Tuple as the output in an operation, things get a bit more complicated.
>>
>> If the Tuple only contains primitive types, then there's no issue as the (effectively) shallow copy created by the execution platform doesn't create a problem.
>>
>> If the Tuple contains an object (e.g. a nested Tuple) then there were situations where a deep copy would need to be made before passing the Tuple to the operation's output collector.
>>
>> For example, if the next (chained) operation was a map-side aggregator, then a shallow copy of the Tuple would be cached. If there's a non-primitive object then changes to this in the upstream operation obviously bork the cached data.
>>
>> Net-net is that it we wanted a way to find out, from inside an operation, whether we needed to make a deep copy of the output Tuple. But that doesn't exist (yet), so we have some utility code to check if a deep copy is needed (non-primitive types), and if so then it auto-clones the Tuple. Which isn't very efficient, but for most of our workflows we only have primitive types.
>>
>> -- Ken
>>
>>> From: Fabian Hueske
>>> Sent: February 17, 2016 9:17:27am PST
>>> To: [hidden email]
>>> Subject: Guarantees for object reuse modes and documentation
>>>
>>> Hi,
>>>
>>>
>>>
>>> Flink's DataSet API features a configuration parameter called
>>> enableObjectReuse(). If activated, Flink's runtime will create fewer
>>> objects which results in better performance and lower garbage collection
>>> overhead. Depending on whether the configuration switch is enabled or not,
>>> user functions may or may not perform certain operations on objects they
>>> receive from Flink or emit to Flink.
>>>
>>>
>>>
>>> At the moment, there are quite a few open issues and discussions going on
>>> about the object reuse mode, including the JIRA issues FLINK-3333,
>>> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
>>>
>>>
>>>
>>> IMO, the most important issue is FLINK-3333 which is about improving the
>>> documentation of the object reuse mode. The current version [1] is
>>> ambiguous and includes details about operator chaining which are hard to
>>> understand and to reason about for users. Hence it is not very clear which
>>> guarantees Flink gives for objects in user functions under which
>>> conditions. This documentation needs to be improved and I think this should
>>> happen together with the 1.0 release.
>>>
>>>
>>>
>>> Greg and Gabor proposed two new versions:
>>>
>>> 1. Greg's version [2]  improves and clarifies the current documentation
>>> without significantly changing the semantics. It also discusses operator
>>> chaining, but gives more details.
>>> 2. Gabor's proposal [3] aims to make the discussion of object reuse
>>> independent of operator chaining which I think is a very good idea because
>>> it is not transparent to the user when function chaining happens. Gabor
>>> formulated four questions to answer what users can do with and expect from
>>> objects that they received or emitted from a function. In order to make the
>>> answers to these questions independent of function chaining and still keep
>>> the contracts as defined by the current documentation, we have to default
>>> to rather restrictive rules. For instance, functions must always emit new
>>> object instances in case of disabled object reuse mode. These strict rules
>>> would for example also require DataSourceFunctions to copy all records
>>> which they receive from an InputFormat (see FLINK-3335). IMO, the strict
>>> guarantees make the disableObjectReuse mode harder to use and reason about
>>> than the enableObjectReuse mode whereas the opposite should be the case.
>>>
>>>
>>>
>>> I would like to suggest a third option. Similar as Gabor, I think the rules
>>> should be independent of function chaining and I would like to break it
>>> down into a handful of easy rules. However, I think we should loosen up the
>>> guarantees for user functions under disableObjectReuse mode a bit.
>>>
>>> Right now, the documentation states that under enableObjectReuse mode,
>>> input objects are not changed across functions calls. Hence users can
>>> remember these objects across functions calls and their value will not
>>> change. I propose to give this guarantee only within functions calls and
>>> only for objects which are not emitted. Hence, this rule only applies for
>>> functions that can consume multiple values through an iterator such as
>>> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse mode,
>>> these functions are allowed to remember the values e.g., in a collection,
>>> and their value will not change when the iterator is forwarded. Once the
>>> function call returns, the values might change. Since  functions with
>>> iterators cannot be directly chained, it will be safe to emit the same
>>> object instance several times (hence FLINK-3335 would become invalid).
>>>
>>>
>>>
>>> The difference to the current guarantees is that input objects become
>>> invalid after the function call returned. Since, the disableObjectReuse
>>> mode was mainly introduced to allow for caching objects across iterator
>>> calls within a GroupReduceFunction or CoGroupFunction (not across function
>>> calls), I think this is a reasonable restriction.
>>>
>>>
>>>
>>> tl;dr;
>>>
>>> If we want to make the documentation of object reuse independent of
>>> chaining we have to
>>>
>>> - EITHER, give tighter guarantees / be more restrictive than now and update
>>> internals which might lead to performance regression. This would be in-line
>>> with the current documentation but somewhat defeat the purpose of the
>>> disabledObjectReuse mode, IMO.
>>>
>>> - OR, give weaker guarantees, which breaks with the current documentation,
>>> but would not affect performance or be easier to follow for users, IMO.
>>>
>>>
>>> Greg and Gabor, please correct me if I did not get your points right or
>>> missed something.
>>>
>>> What do others think?
>>>
>>>
>>> Fabian
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
>>>
>>> [2]
>>> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
>>>
>>> [3]
>>> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr





Reply | Threaded
Open this post in threaded view
|

AW: Guarantees for object reuse modes and documentation

Fabian Hueske-2
Hi Ken,

yes, that’s exactly how it work in Flink as well.

The object reuse move dues not affect how chaining is done.

Best, Fabian



Von: Ken Krugler
Gesendet: Freitag, 4. März 2016 01:54
An: [hidden email]
Betreff: RE: Guarantees for object reuse modes and documentation

Hi Gábor,

When object re-use is enabled, what happens when maps are chained?

In Cascading, when running in local mode, the tuple that one map operation outputs is immediately used as the input for the next map.

The upstream map isn't called with another input tuple until the pipelining is done, which prevents a problem with output tuple re-use.

Is this the same model used by Flink?

Thanks for clarifying,

-- Ken

> From: Gábor Gévay
> Sent: February 20, 2016 4:04:09am PST
> To: [hidden email]
> Subject: Re: Guarantees for object reuse modes and documentation
>
> Thanks, Ken! I was wondering how other systems handle these issues.
>
> Fortunately, the deep copy - shallow copy problem doesn't arise in
> Flink: when we copy an object, it is always a deep copy (at least, I
> hope so :)).
>
> Best,
> Gábor
>
>
>
> 2016-02-19 22:29 GMT+01:00 Ken Krugler <[hidden email]>:
>> Not sure how useful this is, but we'd run into similar issues with Cascading over the years.
>>
>> This wasn't an issue for input data, as Cascading "locks" the Tuple such that attempts to modify it will fail.
>>
>> And in general Hadoop always re-uses the data container being passed to operations, so you quickly learn to not cache those :)
>>
>> When trying to re-use a Tuple as the output in an operation, things get a bit more complicated.
>>
>> If the Tuple only contains primitive types, then there's no issue as the (effectively) shallow copy created by the execution platform doesn't create a problem.
>>
>> If the Tuple contains an object (e.g. a nested Tuple) then there were situations where a deep copy would need to be made before passing the Tuple to the operation's output collector.
>>
>> For example, if the next (chained) operation was a map-side aggregator, then a shallow copy of the Tuple would be cached. If there's a non-primitive object then changes to this in the upstream operation obviously bork the cached data.
>>
>> Net-net is that it we wanted a way to find out, from inside an operation, whether we needed to make a deep copy of the output Tuple. But that doesn't exist (yet), so we have some utility code to check if a deep copy is needed (non-primitive types), and if so then it auto-clones the Tuple. Which isn't very efficient, but for most of our workflows we only have primitive types.
>>
>> -- Ken
>>
>>> From: Fabian Hueske
>>> Sent: February 17, 2016 9:17:27am PST
>>> To: [hidden email]
>>> Subject: Guarantees for object reuse modes and documentation
>>>
>>> Hi,
>>>
>>>
>>>
>>> Flink's DataSet API features a configuration parameter called
>>> enableObjectReuse(). If activated, Flink's runtime will create fewer
>>> objects which results in better performance and lower garbage collection
>>> overhead. Depending on whether the configuration switch is enabled or not,
>>> user functions may or may not perform certain operations on objects they
>>> receive from Flink or emit to Flink.
>>>
>>>
>>>
>>> At the moment, there are quite a few open issues and discussions going on
>>> about the object reuse mode, including the JIRA issues FLINK-3333,
>>> FLINK-1521, FLINK-3335, FLINK-3340, FLINK-3394, and FLINK-3291.
>>>
>>>
>>>
>>> IMO, the most important issue is FLINK-3333 which is about improving the
>>> documentation of the object reuse mode. The current version [1] is
>>> ambiguous and includes details about operator chaining which are hard to
>>> understand and to reason about for users. Hence it is not very clear which
>>> guarantees Flink gives for objects in user functions under which
>>> conditions. This documentation needs to be improved and I think this should
>>> happen together with the 1.0 release.
>>>
>>>
>>>
>>> Greg and Gabor proposed two new versions:
>>>
>>> 1. Greg's version [2]  improves and clarifies the current documentation
>>> without significantly changing the semantics. It also discusses operator
>>> chaining, but gives more details.
>>> 2. Gabor's proposal [3] aims to make the discussion of object reuse
>>> independent of operator chaining which I think is a very good idea because
>>> it is not transparent to the user when function chaining happens. Gabor
>>> formulated four questions to answer what users can do with and expect from
>>> objects that they received or emitted from a function. In order to make the
>>> answers to these questions independent of function chaining and still keep
>>> the contracts as defined by the current documentation, we have to default
>>> to rather restrictive rules. For instance, functions must always emit new
>>> object instances in case of disabled object reuse mode. These strict rules
>>> would for example also require DataSourceFunctions to copy all records
>>> which they receive from an InputFormat (see FLINK-3335). IMO, the strict
>>> guarantees make the disableObjectReuse mode harder to use and reason about
>>> than the enableObjectReuse mode whereas the opposite should be the case.
>>>
>>>
>>>
>>> I would like to suggest a third option. Similar as Gabor, I think the rules
>>> should be independent of function chaining and I would like to break it
>>> down into a handful of easy rules. However, I think we should loosen up the
>>> guarantees for user functions under disableObjectReuse mode a bit.
>>>
>>> Right now, the documentation states that under enableObjectReuse mode,
>>> input objects are not changed across functions calls. Hence users can
>>> remember these objects across functions calls and their value will not
>>> change. I propose to give this guarantee only within functions calls and
>>> only for objects which are not emitted. Hence, this rule only applies for
>>> functions that can consume multiple values through an iterator such as
>>> GroupReduce, CoGroup, or MapPartition. In object disableObjectReuse mode,
>>> these functions are allowed to remember the values e.g., in a collection,
>>> and their value will not change when the iterator is forwarded. Once the
>>> function call returns, the values might change. Since  functions with
>>> iterators cannot be directly chained, it will be safe to emit the same
>>> object instance several times (hence FLINK-3335 would become invalid).
>>>
>>>
>>>
>>> The difference to the current guarantees is that input objects become
>>> invalid after the function call returned. Since, the disableObjectReuse
>>> mode was mainly introduced to allow for caching objects across iterator
>>> calls within a GroupReduceFunction or CoGroupFunction (not across function
>>> calls), I think this is a reasonable restriction.
>>>
>>>
>>>
>>> tl;dr;
>>>
>>> If we want to make the documentation of object reuse independent of
>>> chaining we have to
>>>
>>> - EITHER, give tighter guarantees / be more restrictive than now and update
>>> internals which might lead to performance regression. This would be in-line
>>> with the current documentation but somewhat defeat the purpose of the
>>> disabledObjectReuse mode, IMO.
>>>
>>> - OR, give weaker guarantees, which breaks with the current documentation,
>>> but would not affect performance or be easier to follow for users, IMO.
>>>
>>>
>>> Greg and Gabor, please correct me if I did not get your points right or
>>> missed something.
>>>
>>> What do others think?
>>>
>>>
>>> Fabian
>>>
>>>
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/index.html#object-reuse-behavior
>>>
>>> [2]
>>> https://issues.apache.org/jira/browse/FLINK-3333?focusedCommentId=15139151
>>>
>>> [3]
>>> https://docs.google.com/document/d/1cgkuttvmj4jUonG7E2RdFVjKlfQDm_hE6gvFcgAfzXg


--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big data solutions & training
Hadoop, Cascading, Cassandra & Solr