[Gelly]Distributed Minimum Spanning Tree Example

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[Gelly]Distributed Minimum Spanning Tree Example

Andra Lungu
Hey guys,

As I previously said, I have had some problems getting this DMST algorithm
to be fully functional, either with Flink 0.8 or with Flink 0.9.

My latest problem(and I have been debugging this for quite some days) was
that for the test I wrote that extended MultipleProgramsTestBase, the
Execution mode = CLUSTER test had a very non deterministic behaviour(i.e.
each time it produced a different number of Actual lines, that were,
obviously not equal to the number of Expected lines).


The function that caused problems has the following header:

public DataSet<Tuple5<Long, Long, Double, Long, Long>>
updateRootIdsForRealEdges(
        DataSet<Tuple5<Long, Long, Double, Long, Long>> edges,
        DataSet<Vertex<Long, Long>> verticesWithRootIDs) {

After some classical "printf debugging",  I saw that the:

DataSet<Tuple5<Long, Long, Double, Long, Long>> edges

when performing a join on it, did not always act as if it had all the
values you would expect it to have.

The code snippet that solved the problem raises some questions for me:

DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges =
edges.map(new MapFunction<Tuple5<Long, Long, Double, Long, Long>,
Tuple5<Long, Long, Double, Long, Long>>() {
    @Override
    public Tuple5<Long, Long, Double, Long, Long> map(Tuple5<Long,
Long, Double, Long, Long> longLongDoubleLongLongTuple5) throws
Exception {
        return longLongDoubleLongLongTuple5;
    }
});

 or

DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges =
edges.rebalance();

and then the join would be performed on those rebalancedEdges.
And now the test passes(with either of the two solutions).

So the question is:
*Why  *is this happening? *Is this normal? *
Maybe it has something to do with the context, then how come a simple map
fixes everything?

I am sorry if this may seem like one of those "Why is the sky blue?"
questions, but I am here to learn :D

Thank you!
Andra
Reply | Threaded
Open this post in threaded view
|

Re: [Gelly]Distributed Minimum Spanning Tree Example

Fabian Hueske-2
Hi Andra,

I haven't had a detailed look at Gelly and its functions, but Flink has
only few operators which can cause undeterministic behavior.
In general, user code should be implemented without side effects, i.e., the
result of each function call may only depend on its arguments. This
principle gives Flink the freedom to perform function calls in any order on
any machine.
The only build-in exception is the mapPartition operator, which receives a
whole partition as input where the partitions are not deterministically
computed and depend on input split assignment and optimizer strategy
choices.
Another source of undeterministic results can be incorrect semantic
properties, which can make the optimizer believe that data is already
sorted or partitioned while it is not. In case of your example, an explicit
rebalance operation could reset these believed properties and force the
optimizer to deterministically reorganize the data.

I would have a look at the execution plans for both variants (the
undeterministic and the deterministic).
You can get them as JSON String by calling
ExecutionEnvirionment.getExecutionPlan().

Best, Fabian



For example, the result of a mapPartition() operator can depen


2015-02-14 18:55 GMT+01:00 Andra Lungu <[hidden email]>:

> Hey guys,
>
> As I previously said, I have had some problems getting this DMST algorithm
> to be fully functional, either with Flink 0.8 or with Flink 0.9.
>
> My latest problem(and I have been debugging this for quite some days) was
> that for the test I wrote that extended MultipleProgramsTestBase, the
> Execution mode = CLUSTER test had a very non deterministic behaviour(i.e.
> each time it produced a different number of Actual lines, that were,
> obviously not equal to the number of Expected lines).
>
>
> The function that caused problems has the following header:
>
> public DataSet<Tuple5<Long, Long, Double, Long, Long>>
> updateRootIdsForRealEdges(
>         DataSet<Tuple5<Long, Long, Double, Long, Long>> edges,
>         DataSet<Vertex<Long, Long>> verticesWithRootIDs) {
>
> After some classical "printf debugging",  I saw that the:
>
> DataSet<Tuple5<Long, Long, Double, Long, Long>> edges
>
> when performing a join on it, did not always act as if it had all the
> values you would expect it to have.
>
> The code snippet that solved the problem raises some questions for me:
>
> DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges =
> edges.map(new MapFunction<Tuple5<Long, Long, Double, Long, Long>,
> Tuple5<Long, Long, Double, Long, Long>>() {
>     @Override
>     public Tuple5<Long, Long, Double, Long, Long> map(Tuple5<Long,
> Long, Double, Long, Long> longLongDoubleLongLongTuple5) throws
> Exception {
>         return longLongDoubleLongLongTuple5;
>     }
> });
>
>  or
>
> DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges =
> edges.rebalance();
>
> and then the join would be performed on those rebalancedEdges.
> And now the test passes(with either of the two solutions).
>
> So the question is:
> *Why  *is this happening? *Is this normal? *
> Maybe it has something to do with the context, then how come a simple map
> fixes everything?
>
> I am sorry if this may seem like one of those "Why is the sky blue?"
> questions, but I am here to learn :D
>
> Thank you!
> Andra
>
Reply | Threaded
Open this post in threaded view
|

Re: [Gelly]Distributed Minimum Spanning Tree Example

Stephan Ewen
I think Fabian has a good direction.

@Andra, are you using a mapPartition() operation? If yes, it is really
non-deterministic, unless you explicitly call a partitioning before
(rebalancing being a special case thereof).

If you are not using that, can you point us to the code and give us a bit
more input, like what operations are you applying, and what is the data set
you are using.

Thanks,
Stephan


On Sat, Feb 14, 2015 at 9:17 PM, Fabian Hueske <[hidden email]> wrote:

> Hi Andra,
>
> I haven't had a detailed look at Gelly and its functions, but Flink has
> only few operators which can cause undeterministic behavior.
> In general, user code should be implemented without side effects, i.e., the
> result of each function call may only depend on its arguments. This
> principle gives Flink the freedom to perform function calls in any order on
> any machine.
> The only build-in exception is the mapPartition operator, which receives a
> whole partition as input where the partitions are not deterministically
> computed and depend on input split assignment and optimizer strategy
> choices.
> Another source of undeterministic results can be incorrect semantic
> properties, which can make the optimizer believe that data is already
> sorted or partitioned while it is not. In case of your example, an explicit
> rebalance operation could reset these believed properties and force the
> optimizer to deterministically reorganize the data.
>
> I would have a look at the execution plans for both variants (the
> undeterministic and the deterministic).
> You can get them as JSON String by calling
> ExecutionEnvirionment.getExecutionPlan().
>
> Best, Fabian
>
>
>
> For example, the result of a mapPartition() operator can depen
>
>
> 2015-02-14 18:55 GMT+01:00 Andra Lungu <[hidden email]>:
>
> > Hey guys,
> >
> > As I previously said, I have had some problems getting this DMST
> algorithm
> > to be fully functional, either with Flink 0.8 or with Flink 0.9.
> >
> > My latest problem(and I have been debugging this for quite some days) was
> > that for the test I wrote that extended MultipleProgramsTestBase, the
> > Execution mode = CLUSTER test had a very non deterministic behaviour(i.e.
> > each time it produced a different number of Actual lines, that were,
> > obviously not equal to the number of Expected lines).
> >
> >
> > The function that caused problems has the following header:
> >
> > public DataSet<Tuple5<Long, Long, Double, Long, Long>>
> > updateRootIdsForRealEdges(
> >         DataSet<Tuple5<Long, Long, Double, Long, Long>> edges,
> >         DataSet<Vertex<Long, Long>> verticesWithRootIDs) {
> >
> > After some classical "printf debugging",  I saw that the:
> >
> > DataSet<Tuple5<Long, Long, Double, Long, Long>> edges
> >
> > when performing a join on it, did not always act as if it had all the
> > values you would expect it to have.
> >
> > The code snippet that solved the problem raises some questions for me:
> >
> > DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges =
> > edges.map(new MapFunction<Tuple5<Long, Long, Double, Long, Long>,
> > Tuple5<Long, Long, Double, Long, Long>>() {
> >     @Override
> >     public Tuple5<Long, Long, Double, Long, Long> map(Tuple5<Long,
> > Long, Double, Long, Long> longLongDoubleLongLongTuple5) throws
> > Exception {
> >         return longLongDoubleLongLongTuple5;
> >     }
> > });
> >
> >  or
> >
> > DataSet<Tuple5<Long, Long, Double, Long, Long>> rebalancedEdges =
> > edges.rebalance();
> >
> > and then the join would be performed on those rebalancedEdges.
> > And now the test passes(with either of the two solutions).
> >
> > So the question is:
> > *Why  *is this happening? *Is this normal? *
> > Maybe it has something to do with the context, then how come a simple map
> > fixes everything?
> >
> > I am sorry if this may seem like one of those "Why is the sky blue?"
> > questions, but I am here to learn :D
> >
> > Thank you!
> > Andra
> >
>