|
Hi,
I think people will be confused by the behaviour of startNewChain() in the streaming API. I myself had wrong assumptions about how it behaves when I was writing a test Job and the only other Job where someone not coming from Streaming wrote a Streaming Test is also making wrong assumptions. (StreamCheckpointingITCase, I'm not saying that's a stupid mistake, I made the same mistake myself.) So what chains of operators should this snipped produce: input .map(new Map1()) .map(new Map2()) .startNewChain() .map(new Map3()) .print() I would guess that your assumption about where the split in the chains happens here is wrong. :D Cheers, Aljoscha |
|
I see your point but this is a general problem with any property that we
set on the operators itself. Same goes for instance for parallelism : input .map(new Map1()) .setParallelism(2) .map(new Map2)) .print() Do we change the parallelism after map 1 so it applies to map 2? Gyula On Mon, May 25, 2015 at 10:26 AM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > I think people will be confused by the behaviour of startNewChain() in > the streaming API. I myself had wrong assumptions about how it behaves > when I was writing a test Job and the only other Job where someone not > coming from Streaming wrote a Streaming Test is also making wrong > assumptions. (StreamCheckpointingITCase, I'm not saying that's a > stupid mistake, I made the same mistake myself.) > > So what chains of operators should this snipped produce: > > input > .map(new Map1()) > .map(new Map2()) > .startNewChain() > .map(new Map3()) > .print() > > I would guess that your assumption about where the split in the chains > happens here is wrong. :D > > Cheers, > Aljoscha > |
|
Yes, this is another example where it might be problematic but I think
there are different ideas here: Methods such as setParallelism(), name() and so on can be seen as modifying the operation that was previously constructed. Method such as groupBy(), startNewChain() can be seen as acting at that point in the topology: groupBy changes the partitioning/grouping of the operations coming afterwards, startNewChain() starts a new chain "after" the call. I know that this is also just my opinion and other people could see it differently. This is a problem of our API, where the construction of an operation is not encapsulated but scattered across many different method calls. On Mon, May 25, 2015 at 10:37 AM, Gyula Fóra <[hidden email]> wrote: > I see your point but this is a general problem with any property that we > set on the operators itself. Same goes for instance for parallelism : > > input > .map(new Map1()) > .setParallelism(2) > .map(new Map2)) > .print() > > Do we change the parallelism after map 1 so it applies to map 2? > > Gyula > > > On Mon, May 25, 2015 at 10:26 AM, Aljoscha Krettek <[hidden email]> > wrote: > >> Hi, >> I think people will be confused by the behaviour of startNewChain() in >> the streaming API. I myself had wrong assumptions about how it behaves >> when I was writing a test Job and the only other Job where someone not >> coming from Streaming wrote a Streaming Test is also making wrong >> assumptions. (StreamCheckpointingITCase, I'm not saying that's a >> stupid mistake, I made the same mistake myself.) >> >> So what chains of operators should this snipped produce: >> >> input >> .map(new Map1()) >> .map(new Map2()) >> .startNewChain() >> .map(new Map3()) >> .print() >> >> I would guess that your assumption about where the split in the chains >> happens here is wrong. :D >> >> Cheers, >> Aljoscha >> |
|
I agree with Aljoschas argumentation. It would be more intuitive if
"startNewChain()" splits the chain where is it put. On 05/25/2015 10:48 AM, Aljoscha Krettek wrote: > Yes, this is another example where it might be problematic but I think > there are different ideas here: Methods such as setParallelism(), > name() and so on can be seen as modifying the operation that was > previously constructed. Method such as groupBy(), startNewChain() can > be seen as acting at that point in the topology: groupBy changes the > partitioning/grouping of the operations coming afterwards, > startNewChain() starts a new chain "after" the call. > > I know that this is also just my opinion and other people could see it > differently. This is a problem of our API, where the construction of > an operation is not encapsulated but scattered across many different > method calls. > > On Mon, May 25, 2015 at 10:37 AM, Gyula Fóra <[hidden email]> wrote: >> I see your point but this is a general problem with any property that we >> set on the operators itself. Same goes for instance for parallelism : >> >> input >> .map(new Map1()) >> .setParallelism(2) >> .map(new Map2)) >> .print() >> >> Do we change the parallelism after map 1 so it applies to map 2? >> >> Gyula >> >> >> On Mon, May 25, 2015 at 10:26 AM, Aljoscha Krettek <[hidden email]> >> wrote: >> >>> Hi, >>> I think people will be confused by the behaviour of startNewChain() in >>> the streaming API. I myself had wrong assumptions about how it behaves >>> when I was writing a test Job and the only other Job where someone not >>> coming from Streaming wrote a Streaming Test is also making wrong >>> assumptions. (StreamCheckpointingITCase, I'm not saying that's a >>> stupid mistake, I made the same mistake myself.) >>> >>> So what chains of operators should this snipped produce: >>> >>> input >>> .map(new Map1()) >>> .map(new Map2()) >>> .startNewChain() >>> .map(new Map3()) >>> .print() >>> >>> I would guess that your assumption about where the split in the chains >>> happens here is wrong. :D >>> >>> Cheers, >>> Aljoscha >>> > |
|
I second Aljoscha's and Matthias' opinion on the behavior of
`startNewChain()`. In the case of `setParallelism(..)`, we set the parallelism of the operator but in case of `startNewChain()`, we explicitly start a new chain; for the user, this is not connected to the previous operation even though the programmer sees it being called on the operator itself. However, if the method was instead named `breakChain()` I'd be ok with it. On Mon, May 25, 2015 at 10:48 PM, Matthias J. Sax < [hidden email]> wrote: > I agree with Aljoschas argumentation. It would be more intuitive if > "startNewChain()" splits the chain where is it put. > > > On 05/25/2015 10:48 AM, Aljoscha Krettek wrote: > > Yes, this is another example where it might be problematic but I think > > there are different ideas here: Methods such as setParallelism(), > > name() and so on can be seen as modifying the operation that was > > previously constructed. Method such as groupBy(), startNewChain() can > > be seen as acting at that point in the topology: groupBy changes the > > partitioning/grouping of the operations coming afterwards, > > startNewChain() starts a new chain "after" the call. > > > > I know that this is also just my opinion and other people could see it > > differently. This is a problem of our API, where the construction of > > an operation is not encapsulated but scattered across many different > > method calls. > > > > On Mon, May 25, 2015 at 10:37 AM, Gyula Fóra <[hidden email]> wrote: > >> I see your point but this is a general problem with any property that we > >> set on the operators itself. Same goes for instance for parallelism : > >> > >> input > >> .map(new Map1()) > >> .setParallelism(2) > >> .map(new Map2)) > >> .print() > >> > >> Do we change the parallelism after map 1 so it applies to map 2? > >> > >> Gyula > >> > >> > >> On Mon, May 25, 2015 at 10:26 AM, Aljoscha Krettek <[hidden email] > > > >> wrote: > >> > >>> Hi, > >>> I think people will be confused by the behaviour of startNewChain() in > >>> the streaming API. I myself had wrong assumptions about how it behaves > >>> when I was writing a test Job and the only other Job where someone not > >>> coming from Streaming wrote a Streaming Test is also making wrong > >>> assumptions. (StreamCheckpointingITCase, I'm not saying that's a > >>> stupid mistake, I made the same mistake myself.) > >>> > >>> So what chains of operators should this snipped produce: > >>> > >>> input > >>> .map(new Map1()) > >>> .map(new Map2()) > >>> .startNewChain() > >>> .map(new Map3()) > >>> .print() > >>> > >>> I would guess that your assumption about where the split in the chains > >>> happens here is wrong. :D > >>> > >>> Cheers, > >>> Aljoscha > >>> > > > > |
| Free forum by Nabble | Edit this page |
