[DISCUSS] Behaviour of startNewChain() in Streaming

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Behaviour of startNewChain() in Streaming

Aljoscha Krettek-2
Hi,
I think people will be confused by the behaviour of startNewChain() in
the streaming API. I myself had wrong assumptions about how it behaves
when I was writing a test Job and the only other Job where someone not
coming from Streaming wrote a Streaming Test is also making wrong
assumptions. (StreamCheckpointingITCase, I'm not saying that's a
stupid mistake, I made the same mistake myself.)

So what chains of operators should this snipped produce:

input
  .map(new Map1())
  .map(new Map2())
  .startNewChain()
  .map(new Map3())
  .print()

I would guess that your assumption about where the split in the chains
happens here is wrong. :D

Cheers,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Behaviour of startNewChain() in Streaming

Gyula Fóra-2
I see your point but this is a general problem with any property that we
set on the operators itself. Same goes for instance for parallelism :

input
  .map(new Map1())
  .setParallelism(2)
  .map(new Map2))
  .print()

Do we change the parallelism after map 1 so it applies to map 2?

Gyula


On Mon, May 25, 2015 at 10:26 AM, Aljoscha Krettek <[hidden email]>
wrote:

> Hi,
> I think people will be confused by the behaviour of startNewChain() in
> the streaming API. I myself had wrong assumptions about how it behaves
> when I was writing a test Job and the only other Job where someone not
> coming from Streaming wrote a Streaming Test is also making wrong
> assumptions. (StreamCheckpointingITCase, I'm not saying that's a
> stupid mistake, I made the same mistake myself.)
>
> So what chains of operators should this snipped produce:
>
> input
>   .map(new Map1())
>   .map(new Map2())
>   .startNewChain()
>   .map(new Map3())
>   .print()
>
> I would guess that your assumption about where the split in the chains
> happens here is wrong. :D
>
> Cheers,
> Aljoscha
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Behaviour of startNewChain() in Streaming

Aljoscha Krettek-2
Yes, this is another example where it might be problematic but I think
there are different ideas here: Methods such as setParallelism(),
name() and so on can be seen as modifying the operation that was
previously constructed. Method such as groupBy(), startNewChain() can
be seen as acting at that point in the topology: groupBy changes the
partitioning/grouping of the operations coming afterwards,
startNewChain() starts a new chain "after" the call.

I know that this is also just my opinion and other people could see it
differently. This is a problem of our API, where the construction of
an operation is not encapsulated but scattered across many different
method calls.

On Mon, May 25, 2015 at 10:37 AM, Gyula Fóra <[hidden email]> wrote:

> I see your point but this is a general problem with any property that we
> set on the operators itself. Same goes for instance for parallelism :
>
> input
>   .map(new Map1())
>   .setParallelism(2)
>   .map(new Map2))
>   .print()
>
> Do we change the parallelism after map 1 so it applies to map 2?
>
> Gyula
>
>
> On Mon, May 25, 2015 at 10:26 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>
>> Hi,
>> I think people will be confused by the behaviour of startNewChain() in
>> the streaming API. I myself had wrong assumptions about how it behaves
>> when I was writing a test Job and the only other Job where someone not
>> coming from Streaming wrote a Streaming Test is also making wrong
>> assumptions. (StreamCheckpointingITCase, I'm not saying that's a
>> stupid mistake, I made the same mistake myself.)
>>
>> So what chains of operators should this snipped produce:
>>
>> input
>>   .map(new Map1())
>>   .map(new Map2())
>>   .startNewChain()
>>   .map(new Map3())
>>   .print()
>>
>> I would guess that your assumption about where the split in the chains
>> happens here is wrong. :D
>>
>> Cheers,
>> Aljoscha
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Behaviour of startNewChain() in Streaming

Matthias J. Sax
I agree with Aljoschas argumentation. It would be more intuitive if
"startNewChain()" splits the chain where is it put.


On 05/25/2015 10:48 AM, Aljoscha Krettek wrote:

> Yes, this is another example where it might be problematic but I think
> there are different ideas here: Methods such as setParallelism(),
> name() and so on can be seen as modifying the operation that was
> previously constructed. Method such as groupBy(), startNewChain() can
> be seen as acting at that point in the topology: groupBy changes the
> partitioning/grouping of the operations coming afterwards,
> startNewChain() starts a new chain "after" the call.
>
> I know that this is also just my opinion and other people could see it
> differently. This is a problem of our API, where the construction of
> an operation is not encapsulated but scattered across many different
> method calls.
>
> On Mon, May 25, 2015 at 10:37 AM, Gyula Fóra <[hidden email]> wrote:
>> I see your point but this is a general problem with any property that we
>> set on the operators itself. Same goes for instance for parallelism :
>>
>> input
>>   .map(new Map1())
>>   .setParallelism(2)
>>   .map(new Map2))
>>   .print()
>>
>> Do we change the parallelism after map 1 so it applies to map 2?
>>
>> Gyula
>>
>>
>> On Mon, May 25, 2015 at 10:26 AM, Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>>> Hi,
>>> I think people will be confused by the behaviour of startNewChain() in
>>> the streaming API. I myself had wrong assumptions about how it behaves
>>> when I was writing a test Job and the only other Job where someone not
>>> coming from Streaming wrote a Streaming Test is also making wrong
>>> assumptions. (StreamCheckpointingITCase, I'm not saying that's a
>>> stupid mistake, I made the same mistake myself.)
>>>
>>> So what chains of operators should this snipped produce:
>>>
>>> input
>>>   .map(new Map1())
>>>   .map(new Map2())
>>>   .startNewChain()
>>>   .map(new Map3())
>>>   .print()
>>>
>>> I would guess that your assumption about where the split in the chains
>>> happens here is wrong. :D
>>>
>>> Cheers,
>>> Aljoscha
>>>
>


signature.asc (836 bytes) Download Attachment
mxm
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Behaviour of startNewChain() in Streaming

mxm
I second Aljoscha's and Matthias' opinion on the behavior of
`startNewChain()`. In the case of `setParallelism(..)`, we set the
parallelism of the operator but in case of `startNewChain()`, we explicitly
start a new chain; for the user, this is not connected to the previous
operation even though the programmer sees it being called on the operator
itself. However, if the method was instead named `breakChain()` I'd be ok
with it.

On Mon, May 25, 2015 at 10:48 PM, Matthias J. Sax <
[hidden email]> wrote:

> I agree with Aljoschas argumentation. It would be more intuitive if
> "startNewChain()" splits the chain where is it put.
>
>
> On 05/25/2015 10:48 AM, Aljoscha Krettek wrote:
> > Yes, this is another example where it might be problematic but I think
> > there are different ideas here: Methods such as setParallelism(),
> > name() and so on can be seen as modifying the operation that was
> > previously constructed. Method such as groupBy(), startNewChain() can
> > be seen as acting at that point in the topology: groupBy changes the
> > partitioning/grouping of the operations coming afterwards,
> > startNewChain() starts a new chain "after" the call.
> >
> > I know that this is also just my opinion and other people could see it
> > differently. This is a problem of our API, where the construction of
> > an operation is not encapsulated but scattered across many different
> > method calls.
> >
> > On Mon, May 25, 2015 at 10:37 AM, Gyula Fóra <[hidden email]> wrote:
> >> I see your point but this is a general problem with any property that we
> >> set on the operators itself. Same goes for instance for parallelism :
> >>
> >> input
> >>   .map(new Map1())
> >>   .setParallelism(2)
> >>   .map(new Map2))
> >>   .print()
> >>
> >> Do we change the parallelism after map 1 so it applies to map 2?
> >>
> >> Gyula
> >>
> >>
> >> On Mon, May 25, 2015 at 10:26 AM, Aljoscha Krettek <[hidden email]
> >
> >> wrote:
> >>
> >>> Hi,
> >>> I think people will be confused by the behaviour of startNewChain() in
> >>> the streaming API. I myself had wrong assumptions about how it behaves
> >>> when I was writing a test Job and the only other Job where someone not
> >>> coming from Streaming wrote a Streaming Test is also making wrong
> >>> assumptions. (StreamCheckpointingITCase, I'm not saying that's a
> >>> stupid mistake, I made the same mistake myself.)
> >>>
> >>> So what chains of operators should this snipped produce:
> >>>
> >>> input
> >>>   .map(new Map1())
> >>>   .map(new Map2())
> >>>   .startNewChain()
> >>>   .map(new Map3())
> >>>   .print()
> >>>
> >>> I would guess that your assumption about where the split in the chains
> >>> happens here is wrong. :D
> >>>
> >>> Cheers,
> >>> Aljoscha
> >>>
> >
>
>