Intentional delta-iteration constraint or bug?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Intentional delta-iteration constraint or bug?

Jack David Galilee
Hi,


I know a similar problem to this was raised earlier last month from the archive (http://mail-archives.apache.org/mod_mbox/flink-dev/201406.mbox/browser). However, I am unable to see if this was ever solved.


I am encountering the same problem "In the given plan, the solution set delta does not depend on the workset.", but what I can't ascertain (having examined the PACT compiler (0.5.1)) is whether this is a bug or an intentional design constraint placed on the delta iteration operator.


My algorithm sits between the Delta and Bulk iterative models as an Incremental iterative algorithm. The solution set is the union of all working sets up until the current working set is empty.


The working set is broadcast to a single operator in the data-flow. This appears to be the problem, the compiler is unable to determine the dependency via this broadcast.


To make things more complex my data does not suit the pseudo-relational model Flink is designed around. I am dealing with variable length sets / arrays so I can't join against the solution set, or working set between iterations because the data has no notion of keys.


I can make it 'run' as a BulkIteration, but the result is the final state (the empty working set) as at least the 0.5.1 API doesn't allow all previous steps to be captured in a union - I essentially lose the answer once the algorithm converges.


Your opinion as to whether this is actually a bug, or if I am doing it all completely wrong would be most appreciated.



Cheers,

Jack Galilee
Reply | Threaded
Open this post in threaded view
|

Re: Intentional delta-iteration constraint or bug?

Fabian Hueske
Hi Jake,

the problem in the other thread was that, for testing purposes, a
collection data source was used as delta set. So the working set and the
delta set were not connected at all.

In your program this is the case, though the connection is through a
broadcast set which is not detected by the compiler.

I am not very familiar with the iterations code, so I leave the tricky
questions for somebody who knows the details.

Best, Fabian




2014-07-06 10:38 GMT+02:00 Jack David Galilee <[hidden email]>:

> Hi,
>
>
> I know a similar problem to this was raised earlier last month from the
> archive (
> http://mail-archives.apache.org/mod_mbox/flink-dev/201406.mbox/browser).
> However, I am unable to see if this was ever solved.
>
>
> I am encountering the same problem "In the given plan, the solution set
> delta does not depend on the workset.", but what I can't ascertain (having
> examined the PACT compiler (0.5.1)) is whether this is a bug or an
> intentional design constraint placed on the delta iteration operator.
>
>
> My algorithm sits between the Delta and Bulk iterative models as an
> Incremental iterative algorithm. The solution set is the union of all
> working sets up until the current working set is empty.
>
>
> The working set is broadcast to a single operator in the data-flow. This
> appears to be the problem, the compiler is unable to determine the
> dependency via this broadcast.
>
>
> To make things more complex my data does not suit the pseudo-relational
> model Flink is designed around. I am dealing with variable length sets /
> arrays so I can't join against the solution set, or working set between
> iterations because the data has no notion of keys.
>
>
> I can make it 'run' as a BulkIteration, but the result is the final state
> (the empty working set) as at least the 0.5.1 API doesn't allow all
> previous steps to be captured in a union - I essentially lose the answer
> once the algorithm converges.
>
>
> Your opinion as to whether this is actually a bug, or if I am doing it all
> completely wrong would be most appreciated.
>
>
>
> Cheers,
>
> Jack Galilee
>
Reply | Threaded
Open this post in threaded view
|

Re: Intentional delta-iteration constraint or bug?

Vasiliki Kalavri
Hi Jack,

regarding the "solution set delta does not depend on the workset" issue, in
Delta iterations the state is maintained in the solution set and the
workset serves as the input to the next iteration. The solution set delta
represents the changes that need to be merged to the state (solution set)
at the end of each superstep. Therefore, the solution set delta needs to
depend on the input (the workset). However, as Fabian already said, this is
not detected if you're using a broadcast set. Why don't you provide it as a
regular input to your operator?

Regarding the notion of keys, the Delta iteration assumes that each element
in the solution set is uniquely identified by a key and this is how the
merge with the solution set delta happens, after the end of a superstep.
One solution to your problem might be to create a new random key for each
element that you want to add to the solution set. Would that be possible?

Cheers,
V.





On 7 July 2014 10:53, Fabian Hueske <[hidden email]> wrote:

> Hi Jake,
>
> the problem in the other thread was that, for testing purposes, a
> collection data source was used as delta set. So the working set and the
> delta set were not connected at all.
>
> In your program this is the case, though the connection is through a
> broadcast set which is not detected by the compiler.
>
> I am not very familiar with the iterations code, so I leave the tricky
> questions for somebody who knows the details.
>
> Best, Fabian
>
>
>
>
> 2014-07-06 10:38 GMT+02:00 Jack David Galilee <[hidden email]
> >:
>
> > Hi,
> >
> >
> > I know a similar problem to this was raised earlier last month from the
> > archive (
> > http://mail-archives.apache.org/mod_mbox/flink-dev/201406.mbox/browser).
> > However, I am unable to see if this was ever solved.
> >
> >
> > I am encountering the same problem "In the given plan, the solution set
> > delta does not depend on the workset.", but what I can't ascertain
> (having
> > examined the PACT compiler (0.5.1)) is whether this is a bug or an
> > intentional design constraint placed on the delta iteration operator.
> >
> >
> > My algorithm sits between the Delta and Bulk iterative models as an
> > Incremental iterative algorithm. The solution set is the union of all
> > working sets up until the current working set is empty.
> >
> >
> > The working set is broadcast to a single operator in the data-flow. This
> > appears to be the problem, the compiler is unable to determine the
> > dependency via this broadcast.
> >
> >
> > To make things more complex my data does not suit the pseudo-relational
> > model Flink is designed around. I am dealing with variable length sets /
> > arrays so I can't join against the solution set, or working set between
> > iterations because the data has no notion of keys.
> >
> >
> > I can make it 'run' as a BulkIteration, but the result is the final state
> > (the empty working set) as at least the 0.5.1 API doesn't allow all
> > previous steps to be captured in a union - I essentially lose the answer
> > once the algorithm converges.
> >
> >
> > Your opinion as to whether this is actually a bug, or if I am doing it
> all
> > completely wrong would be most appreciated.
> >
> >
> >
> > Cheers,
> >
> > Jack Galilee
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Intentional delta-iteration constraint or bug?

Stephan Ewen
Hey Jack!

Let me look into this. It should be okay to have the Solution Set Delta
depend on the Workset via a Broadcast Variable. If the system prohibits
that, it is not intentional.

If you can provide us with a minimal example that produces the error, it
would be great.

Greetings,
Stephan



On Thu, Jul 10, 2014 at 3:25 PM, Vasiliki Kalavri <[hidden email]
> wrote:

> Hi Jack,
>
> regarding the "solution set delta does not depend on the workset" issue, in
> Delta iterations the state is maintained in the solution set and the
> workset serves as the input to the next iteration. The solution set delta
> represents the changes that need to be merged to the state (solution set)
> at the end of each superstep. Therefore, the solution set delta needs to
> depend on the input (the workset). However, as Fabian already said, this is
> not detected if you're using a broadcast set. Why don't you provide it as a
> regular input to your operator?
>
> Regarding the notion of keys, the Delta iteration assumes that each element
> in the solution set is uniquely identified by a key and this is how the
> merge with the solution set delta happens, after the end of a superstep.
> One solution to your problem might be to create a new random key for each
> element that you want to add to the solution set. Would that be possible?
>
> Cheers,
> V.
>
>
>
>
>
> On 7 July 2014 10:53, Fabian Hueske <[hidden email]> wrote:
>
> > Hi Jake,
> >
> > the problem in the other thread was that, for testing purposes, a
> > collection data source was used as delta set. So the working set and the
> > delta set were not connected at all.
> >
> > In your program this is the case, though the connection is through a
> > broadcast set which is not detected by the compiler.
> >
> > I am not very familiar with the iterations code, so I leave the tricky
> > questions for somebody who knows the details.
> >
> > Best, Fabian
> >
> >
> >
> >
> > 2014-07-06 10:38 GMT+02:00 Jack David Galilee <
> [hidden email]
> > >:
> >
> > > Hi,
> > >
> > >
> > > I know a similar problem to this was raised earlier last month from the
> > > archive (
> > > http://mail-archives.apache.org/mod_mbox/flink-dev/201406.mbox/browser
> ).
> > > However, I am unable to see if this was ever solved.
> > >
> > >
> > > I am encountering the same problem "In the given plan, the solution set
> > > delta does not depend on the workset.", but what I can't ascertain
> > (having
> > > examined the PACT compiler (0.5.1)) is whether this is a bug or an
> > > intentional design constraint placed on the delta iteration operator.
> > >
> > >
> > > My algorithm sits between the Delta and Bulk iterative models as an
> > > Incremental iterative algorithm. The solution set is the union of all
> > > working sets up until the current working set is empty.
> > >
> > >
> > > The working set is broadcast to a single operator in the data-flow.
> This
> > > appears to be the problem, the compiler is unable to determine the
> > > dependency via this broadcast.
> > >
> > >
> > > To make things more complex my data does not suit the pseudo-relational
> > > model Flink is designed around. I am dealing with variable length sets
> /
> > > arrays so I can't join against the solution set, or working set between
> > > iterations because the data has no notion of keys.
> > >
> > >
> > > I can make it 'run' as a BulkIteration, but the result is the final
> state
> > > (the empty working set) as at least the 0.5.1 API doesn't allow all
> > > previous steps to be captured in a union - I essentially lose the
> answer
> > > once the algorithm converges.
> > >
> > >
> > > Your opinion as to whether this is actually a bug, or if I am doing it
> > all
> > > completely wrong would be most appreciated.
> > >
> > >
> > >
> > > Cheers,
> > >
> > > Jack Galilee
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Intentional delta-iteration constraint or bug?

Stephan Ewen
Hi!

I have implemented a simple example where the solution set delta depends on the workset through a broadcast variable. It works both in the latest 0.6-SNAPSHOT and in 0.5.1. Attached is the rendered execution plan, which shows the workset going only in through a BC variable.

Maybe the problem is something different. Can you post your program?

Stephan




My sample program:


DataSet<Tuple2<Long, Long>> source =
env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());

DataSet<Tuple2<Long, Long>> invariantInput =
env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());

// iteration from here
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = source.iterateDelta(source, 1000, 1);

DataSet<Tuple2<Long, Long>> result =
invariantInput
.map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1).types(Long.class, Long.class);

iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result).print();

OptimizedPlan p = compileNoStats(env.createProgramPlan());


On Thu, Jul 10, 2014 at 7:28 PM, Stephan Ewen <[hidden email]> wrote:
Hey Jack!

Let me look into this. It should be okay to have the Solution Set Delta depend on the Workset via a Broadcast Variable. If the system prohibits that, it is not intentional.

If you can provide us with a minimal example that produces the error, it would be great.

Greetings,
Stephan



On Thu, Jul 10, 2014 at 3:25 PM, Vasiliki Kalavri <[hidden email]> wrote:
Hi Jack,

regarding the "solution set delta does not depend on the workset" issue, in
Delta iterations the state is maintained in the solution set and the
workset serves as the input to the next iteration. The solution set delta
represents the changes that need to be merged to the state (solution set)
at the end of each superstep. Therefore, the solution set delta needs to
depend on the input (the workset). However, as Fabian already said, this is
not detected if you're using a broadcast set. Why don't you provide it as a
regular input to your operator?

Regarding the notion of keys, the Delta iteration assumes that each element
in the solution set is uniquely identified by a key and this is how the
merge with the solution set delta happens, after the end of a superstep.
One solution to your problem might be to create a new random key for each
element that you want to add to the solution set. Would that be possible?

Cheers,
V.





On 7 July 2014 10:53, Fabian Hueske <[hidden email]> wrote:

> Hi Jake,
>
> the problem in the other thread was that, for testing purposes, a
> collection data source was used as delta set. So the working set and the
> delta set were not connected at all.
>
> In your program this is the case, though the connection is through a
> broadcast set which is not detected by the compiler.
>
> I am not very familiar with the iterations code, so I leave the tricky
> questions for somebody who knows the details.
>
> Best, Fabian
>
>
>
>
> 2014-07-06 10:38 GMT+02:00 Jack David Galilee <[hidden email]
> >:
>
> > Hi,
> >
> >
> > I know a similar problem to this was raised earlier last month from the
> > archive (
> > http://mail-archives.apache.org/mod_mbox/flink-dev/201406.mbox/browser).
> > However, I am unable to see if this was ever solved.
> >
> >
> > I am encountering the same problem "In the given plan, the solution set
> > delta does not depend on the workset.", but what I can't ascertain
> (having
> > examined the PACT compiler (0.5.1)) is whether this is a bug or an
> > intentional design constraint placed on the delta iteration operator.
> >
> >
> > My algorithm sits between the Delta and Bulk iterative models as an
> > Incremental iterative algorithm. The solution set is the union of all
> > working sets up until the current working set is empty.
> >
> >
> > The working set is broadcast to a single operator in the data-flow. This
> > appears to be the problem, the compiler is unable to determine the
> > dependency via this broadcast.
> >
> >
> > To make things more complex my data does not suit the pseudo-relational
> > model Flink is designed around. I am dealing with variable length sets /
> > arrays so I can't join against the solution set, or working set between
> > iterations because the data has no notion of keys.
> >
> >
> > I can make it 'run' as a BulkIteration, but the result is the final state
> > (the empty working set) as at least the 0.5.1 API doesn't allow all
> > previous steps to be captured in a union - I essentially lose the answer
> > once the algorithm converges.
> >
> >
> > Your opinion as to whether this is actually a bug, or if I am doing it
> all
> > completely wrong would be most appreciated.
> >
> >
> >
> > Cheers,
> >
> > Jack Galilee
> >
>


Reply | Threaded
Open this post in threaded view
|

RE: Intentional delta-iteration constraint or bug?

Jack David Galilee
Hi Stephan, Vasiliki, Fabian,


Thanks so much for getting back to me so quickly. I apologies I wasn't able to reply with the same speed. I have been waiting until I had a large enough chunk of time to go through each of your suggestions in order to ensure I could provide useful feedback.


Unfortunately I can't post my program at the moment.


Stephen your example showcased what I need to do as a work around and when combined with Vasiliki's suggestion of randomly generating ID's and I could compile the algorithm.


What I had to do with my Arrays was to use the string representation of the array as the ID after a filter operation where I knew it would be unique in the dataset and across all iterations. This seems to have lead to a weird case in which I've needed to duplicate the data into Tuple2<String, String[]> where the 1st column is the string representation of the 2nd.


Unfortunately the algorithm still doesn't run correctly but I'm not sure if this is my fault or not yet. I'm going to keep working on it and will let you know if it turns out to be the iteration operator again.


I feel like this is an interesting problem to solve (non-unique values and collection types) and would like to know if you agree. I saw in the mailing list with a survey you did at University of Berlin that the collection data type issue is one you're aware of. I'd also be quite interested in helping at the end of the year when I have some more free time.



Cheers,


________________________________
From: [hidden email] <[hidden email]> on behalf of Stephan Ewen <[hidden email]>
Sent: 11 July 2014 05:17
To: [hidden email]
Subject: Re: Intentional delta-iteration constraint or bug?

Hi!

I have implemented a simple example where the solution set delta depends on the workset through a broadcast variable. It works both in the latest 0.6-SNAPSHOT and in 0.5.1. Attached is the rendered execution plan, which shows the workset going only in through a BC variable.

Maybe the problem is something different. Can you post your program?

Stephan




My sample program:


DataSet<Tuple2<Long, Long>> source =
env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());

DataSet<Tuple2<Long, Long>> invariantInput =
env.generateSequence(1, 1000).map(new DuplicateValueScalar<Long>());

// iteration from here
DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iter = source.iterateDelta(source, 1000, 1);

DataSet<Tuple2<Long, Long>> result =
invariantInput
.map(new IdentityMapper<Tuple2<Long, Long>>()).withBroadcastSet(iter.getWorkset(), "bc data")
.join(iter.getSolutionSet()).where(0).equalTo(1).projectFirst(1).projectSecond(1).types(Long.class, Long.class);

iter.closeWith(result.map(new IdentityMapper<Tuple2<Long,Long>>()), result).print();

OptimizedPlan p = compileNoStats(env.createProgramPlan());


On Thu, Jul 10, 2014 at 7:28 PM, Stephan Ewen <[hidden email]<mailto:[hidden email]>> wrote:
Hey Jack!

Let me look into this. It should be okay to have the Solution Set Delta depend on the Workset via a Broadcast Variable. If the system prohibits that, it is not intentional.

If you can provide us with a minimal example that produces the error, it would be great.

Greetings,
Stephan



On Thu, Jul 10, 2014 at 3:25 PM, Vasiliki Kalavri <[hidden email]<mailto:[hidden email]>> wrote:
Hi Jack,

regarding the "solution set delta does not depend on the workset" issue, in
Delta iterations the state is maintained in the solution set and the
workset serves as the input to the next iteration. The solution set delta
represents the changes that need to be merged to the state (solution set)
at the end of each superstep. Therefore, the solution set delta needs to
depend on the input (the workset). However, as Fabian already said, this is
not detected if you're using a broadcast set. Why don't you provide it as a
regular input to your operator?

Regarding the notion of keys, the Delta iteration assumes that each element
in the solution set is uniquely identified by a key and this is how the
merge with the solution set delta happens, after the end of a superstep.
One solution to your problem might be to create a new random key for each
element that you want to add to the solution set. Would that be possible?

Cheers,
V.





On 7 July 2014 10:53, Fabian Hueske <[hidden email]<mailto:[hidden email]>> wrote:

> Hi Jake,
>
> the problem in the other thread was that, for testing purposes, a
> collection data source was used as delta set. So the working set and the
> delta set were not connected at all.
>
> In your program this is the case, though the connection is through a
> broadcast set which is not detected by the compiler.
>
> I am not very familiar with the iterations code, so I leave the tricky
> questions for somebody who knows the details.
>
> Best, Fabian
>
>
>
>
> 2014-07-06 10:38 GMT+02:00 Jack David Galilee <[hidden email]<mailto:[hidden email]>
> >:
>
> > Hi,
> >
> >
> > I know a similar problem to this was raised earlier last month from the
> > archive (
> > http://mail-archives.apache.org/mod_mbox/flink-dev/201406.mbox/browser).
> > However, I am unable to see if this was ever solved.
> >
> >
> > I am encountering the same problem "In the given plan, the solution set
> > delta does not depend on the workset.", but what I can't ascertain
> (having
> > examined the PACT compiler (0.5.1)) is whether this is a bug or an
> > intentional design constraint placed on the delta iteration operator.
> >
> >
> > My algorithm sits between the Delta and Bulk iterative models as an
> > Incremental iterative algorithm. The solution set is the union of all
> > working sets up until the current working set is empty.
> >
> >
> > The working set is broadcast to a single operator in the data-flow. This
> > appears to be the problem, the compiler is unable to determine the
> > dependency via this broadcast.
> >
> >
> > To make things more complex my data does not suit the pseudo-relational
> > model Flink is designed around. I am dealing with variable length sets /
> > arrays so I can't join against the solution set, or working set between
> > iterations because the data has no notion of keys.
> >
> >
> > I can make it 'run' as a BulkIteration, but the result is the final state
> > (the empty working set) as at least the 0.5.1 API doesn't allow all
> > previous steps to be captured in a union - I essentially lose the answer
> > once the algorithm converges.
> >
> >
> > Your opinion as to whether this is actually a bug, or if I am doing it
> all
> > completely wrong would be most appreciated.
> >
> >
> >
> > Cheers,
> >
> > Jack Galilee
> >
>


Reply | Threaded
Open this post in threaded view
|

Re: Intentional delta-iteration constraint or bug?

Ufuk Celebi
Hey Jack,

thanks for the update.

On 28 Jul 2014, at 03:22, Jack David Galilee <[hidden email]> wrote:
> What I had to do with my Arrays was to use the string representation of the array as the ID after a filter operation where I knew it would be unique in the dataset and across all iterations. This seems to have lead to a weird case in which I've needed to duplicate the data into Tuple2<String, String[]> where the 1st column is the string representation of the 2nd.

As you mentioned, this is a shortcoming at the moment, but we already have a PR in place, which is waiting to be tested. You posting here, confirms that we need to fix this asap.

> Unfortunately the algorithm still doesn't run correctly but I'm not sure if this is my fault or not yet. I'm going to keep working on it and will let you know if it turns out to be the iteration operator again.

If you feel differently about posting the program in the future, feel free to do so. You shouldn't worry about whether it is your fault or the system's, because the goal is to make the system as easy to use as possible. So if something is not clear for you, we obviously have stuff to improve. ;-)

> I feel like this is an interesting problem to solve (non-unique values and collection types) and would like to know if you agree. I saw in the mailing list with a survey you did at University of Berlin that the collection data type issue is one you're aware of. I'd also be quite interested in helping at the end of the year when I have some more free time.

Sounds great! :)