Gather a distributed dataset

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Gather a distributed dataset

aalexandrov
Hi there,

I wished for intermediate datasets, and Santa Ufuk made my wishes come true
(thank you, Santa)!

Now that FLINK-986 is in the mainline, I want to ask some practical
questions.

In Spark, there is a way to put a value from the local driver to the
distributed runtime via

val x = env.parallelize(...) // expose x to the distributed runtime
val y = dataflow(env, x) // y is produced by a dataflow which reads from x

and also to get a value from the distributed runtime back to the driver

val z = env.collect("y")

As far as I know, in Flink we have an equivalent for parallelize

val x = env.fromCollection(...)

but not for collect. Is this still the case?

If yes, how hard would it be to add this feature at the moment? Can you
give me some pointers?

Regards,

Alexander
Reply | Threaded
Open this post in threaded view
|

RE: Gather a distributed dataset

Paris Carbone
Hello Alexander,

Intermediate results are indeed looking promising, also for finally implementing a proper flink-shell for exploratory data analysis.
We are also looking at the moment on how to implement a collect() for the flink-streaming scala api that returns a Seq that can be consumed at the client side as a part of Flink-1344 [1]. It looks like intermediate results support will help, basically I would like to be able to initiate a stream endpoint at the client side via the JobClient perhaps, referencing an intermediate result id for example. For streaming this is a feature that Spark doesn't explicitly have (one has to use foreach and collect on a dstream which is quite inefficient) so I guess it would be nice to add.

Paris

[1] https://issues.apache.org/jira/browse/FLINK-1344
________________________________________
From: Alexander Alexandrov [[hidden email]]
Sent: Monday, January 12, 2015 11:42 AM
To: [hidden email]
Subject: Gather a distributed dataset

Hi there,

I wished for intermediate datasets, and Santa Ufuk made my wishes come true
(thank you, Santa)!

Now that FLINK-986 is in the mainline, I want to ask some practical
questions.

In Spark, there is a way to put a value from the local driver to the
distributed runtime via

val x = env.parallelize(...) // expose x to the distributed runtime
val y = dataflow(env, x) // y is produced by a dataflow which reads from x

and also to get a value from the distributed runtime back to the driver

val z = env.collect("y")

As far as I know, in Flink we have an equivalent for parallelize

val x = env.fromCollection(...)

but not for collect. Is this still the case?

If yes, how hard would it be to add this feature at the moment? Can you
give me some pointers?

Regards,

Alexander
Reply | Threaded
Open this post in threaded view
|

Re: Gather a distributed dataset

Ufuk Celebi-2
In reply to this post by aalexandrov
Hey Alexander,

On 12 Jan 2015, at 11:42, Alexander Alexandrov <[hidden email]> wrote:

> Hi there,
>
> I wished for intermediate datasets, and Santa Ufuk made my wishes come true
> (thank you, Santa)!
>
> Now that FLINK-986 is in the mainline, I want to ask some practical
> questions.
>
> In Spark, there is a way to put a value from the local driver to the
> distributed runtime via
>
> val x = env.parallelize(...) // expose x to the distributed runtime
> val y = dataflow(env, x) // y is produced by a dataflow which reads from x
>
> and also to get a value from the distributed runtime back to the driver
>
> val z = env.collect("y")
>
> As far as I know, in Flink we have an equivalent for parallelize
>
> val x = env.fromCollection(...)
>
> but not for collect. Is this still the case?

Yes, but this will change soon.

> If yes, how hard would it be to add this feature at the moment? Can you
> give me some pointers?

There is a "alpha" version/hack of this using accumulators. See https://github.com/apache/flink/pull/210. The problem is that each collect call results in a new program being executed from the sources. I think Stephan is working on the scheduling to support this kind of programs. From the runtime perspective, it is not a problem to transfer the produced intermediate results back to the job manager. The job manager can basically use the same mechanism that the task managers use. Even the accumulator version should be fine as a initial version.
Reply | Threaded
Open this post in threaded view
|

Re: Gather a distributed dataset

aalexandrov
Thanks, I am currently looking at the new ExecutionEnvironment API.

> I think Stephan is working on the scheduling to support this kind of
programs.

@Stephan: is there a feature branch for that somewhere?

2015-01-12 12:05 GMT+01:00 Ufuk Celebi <[hidden email]>:

> Hey Alexander,
>
> On 12 Jan 2015, at 11:42, Alexander Alexandrov <
> [hidden email]> wrote:
>
> > Hi there,
> >
> > I wished for intermediate datasets, and Santa Ufuk made my wishes come
> true
> > (thank you, Santa)!
> >
> > Now that FLINK-986 is in the mainline, I want to ask some practical
> > questions.
> >
> > In Spark, there is a way to put a value from the local driver to the
> > distributed runtime via
> >
> > val x = env.parallelize(...) // expose x to the distributed runtime
> > val y = dataflow(env, x) // y is produced by a dataflow which reads from
> x
> >
> > and also to get a value from the distributed runtime back to the driver
> >
> > val z = env.collect("y")
> >
> > As far as I know, in Flink we have an equivalent for parallelize
> >
> > val x = env.fromCollection(...)
> >
> > but not for collect. Is this still the case?
>
> Yes, but this will change soon.
>
> > If yes, how hard would it be to add this feature at the moment? Can you
> > give me some pointers?
>
> There is a "alpha" version/hack of this using accumulators. See
> https://github.com/apache/flink/pull/210. The problem is that each
> collect call results in a new program being executed from the sources. I
> think Stephan is working on the scheduling to support this kind of
> programs. From the runtime perspective, it is not a problem to transfer the
> produced intermediate results back to the job manager. The job manager can
> basically use the same mechanism that the task managers use. Even the
> accumulator version should be fine as a initial version.
Reply | Threaded
Open this post in threaded view
|

Re: Gather a distributed dataset

Stephan Ewen
Hi!

To follow up on what Ufuk explaned:

 - Ufuk is right, the problem is not getting the data set.
https://github.com/apache/flink/pull/210 does that for anything that is not
too gigantic, which is a good start. I think we should merge this as soon
as we agree on the signature and names of the API methods. We can swap the
internal realization for something more robust later.

 - For anything that just issues a program and wants the result back, this
is actually perfectly fine.

 - For true interactive programs, we need to back track to intermediate
results (rather than to the source) to avoid re-executing large parts. This
is the biggest missing piece, next to the persistent materialization of
intermediate results (Ufuk is working on this). The logic is the same as
for fault tolerance, so it is part of that development.

@alexander: I want to create the feature branch for that on Thursday. Are
you interested in contributing to that feature?

 - For streaming results continuously back, we need another mechanism than
the accumulators. Let's create a design doc or thread an get working on
that. Probably involves adding another set of akka messages from TM -> JM
-> Client. Or something like an extension to the BLOB manager for streams?

Greetings,
Stephan


On Mon, Jan 12, 2015 at 12:25 PM, Alexander Alexandrov <
[hidden email]> wrote:

> Thanks, I am currently looking at the new ExecutionEnvironment API.
>
> > I think Stephan is working on the scheduling to support this kind of
> programs.
>
> @Stephan: is there a feature branch for that somewhere?
>
> 2015-01-12 12:05 GMT+01:00 Ufuk Celebi <[hidden email]>:
>
> > Hey Alexander,
> >
> > On 12 Jan 2015, at 11:42, Alexander Alexandrov <
> > [hidden email]> wrote:
> >
> > > Hi there,
> > >
> > > I wished for intermediate datasets, and Santa Ufuk made my wishes come
> > true
> > > (thank you, Santa)!
> > >
> > > Now that FLINK-986 is in the mainline, I want to ask some practical
> > > questions.
> > >
> > > In Spark, there is a way to put a value from the local driver to the
> > > distributed runtime via
> > >
> > > val x = env.parallelize(...) // expose x to the distributed runtime
> > > val y = dataflow(env, x) // y is produced by a dataflow which reads
> from
> > x
> > >
> > > and also to get a value from the distributed runtime back to the driver
> > >
> > > val z = env.collect("y")
> > >
> > > As far as I know, in Flink we have an equivalent for parallelize
> > >
> > > val x = env.fromCollection(...)
> > >
> > > but not for collect. Is this still the case?
> >
> > Yes, but this will change soon.
> >
> > > If yes, how hard would it be to add this feature at the moment? Can you
> > > give me some pointers?
> >
> > There is a "alpha" version/hack of this using accumulators. See
> > https://github.com/apache/flink/pull/210. The problem is that each
> > collect call results in a new program being executed from the sources. I
> > think Stephan is working on the scheduling to support this kind of
> > programs. From the runtime perspective, it is not a problem to transfer
> the
> > produced intermediate results back to the job manager. The job manager
> can
> > basically use the same mechanism that the task managers use. Even the
> > accumulator version should be fine as a initial version.
>
Reply | Threaded
Open this post in threaded view
|

Re: Gather a distributed dataset

aalexandrov
@Stephan: yes, I would like to contribute (e.g. I can design the interfaces
and merge 210).

Please reply with more information once you have the branch, I can find
some time for that next week (on the expense of FLINK-1347
<https://issues.apache.org/jira/browse/FLINK-1347> which hopefully can wait
3-4 more weeks).

Regards,
Alex


2015-01-13 16:50 GMT+01:00 Stephan Ewen <[hidden email]>:

> Hi!
>
> To follow up on what Ufuk explaned:
>
>  - Ufuk is right, the problem is not getting the data set.
> https://github.com/apache/flink/pull/210 does that for anything that is
> not
> too gigantic, which is a good start. I think we should merge this as soon
> as we agree on the signature and names of the API methods. We can swap the
> internal realization for something more robust later.
>
>  - For anything that just issues a program and wants the result back, this
> is actually perfectly fine.
>
>  - For true interactive programs, we need to back track to intermediate
> results (rather than to the source) to avoid re-executing large parts. This
> is the biggest missing piece, next to the persistent materialization of
> intermediate results (Ufuk is working on this). The logic is the same as
> for fault tolerance, so it is part of that development.
>
> @alexander: I want to create the feature branch for that on Thursday. Are
> you interested in contributing to that feature?
>
>  - For streaming results continuously back, we need another mechanism than
> the accumulators. Let's create a design doc or thread an get working on
> that. Probably involves adding another set of akka messages from TM -> JM
> -> Client. Or something like an extension to the BLOB manager for streams?
>
> Greetings,
> Stephan
>
>
> On Mon, Jan 12, 2015 at 12:25 PM, Alexander Alexandrov <
> [hidden email]> wrote:
>
> > Thanks, I am currently looking at the new ExecutionEnvironment API.
> >
> > > I think Stephan is working on the scheduling to support this kind of
> > programs.
> >
> > @Stephan: is there a feature branch for that somewhere?
> >
> > 2015-01-12 12:05 GMT+01:00 Ufuk Celebi <[hidden email]>:
> >
> > > Hey Alexander,
> > >
> > > On 12 Jan 2015, at 11:42, Alexander Alexandrov <
> > > [hidden email]> wrote:
> > >
> > > > Hi there,
> > > >
> > > > I wished for intermediate datasets, and Santa Ufuk made my wishes
> come
> > > true
> > > > (thank you, Santa)!
> > > >
> > > > Now that FLINK-986 is in the mainline, I want to ask some practical
> > > > questions.
> > > >
> > > > In Spark, there is a way to put a value from the local driver to the
> > > > distributed runtime via
> > > >
> > > > val x = env.parallelize(...) // expose x to the distributed runtime
> > > > val y = dataflow(env, x) // y is produced by a dataflow which reads
> > from
> > > x
> > > >
> > > > and also to get a value from the distributed runtime back to the
> driver
> > > >
> > > > val z = env.collect("y")
> > > >
> > > > As far as I know, in Flink we have an equivalent for parallelize
> > > >
> > > > val x = env.fromCollection(...)
> > > >
> > > > but not for collect. Is this still the case?
> > >
> > > Yes, but this will change soon.
> > >
> > > > If yes, how hard would it be to add this feature at the moment? Can
> you
> > > > give me some pointers?
> > >
> > > There is a "alpha" version/hack of this using accumulators. See
> > > https://github.com/apache/flink/pull/210. The problem is that each
> > > collect call results in a new program being executed from the sources.
> I
> > > think Stephan is working on the scheduling to support this kind of
> > > programs. From the runtime perspective, it is not a problem to transfer
> > the
> > > produced intermediate results back to the job manager. The job manager
> > can
> > > basically use the same mechanism that the task managers use. Even the
> > > accumulator version should be fine as a initial version.
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Gather a distributed dataset

Ufuk Celebi-2
In reply to this post by Stephan Ewen

On 13 Jan 2015, at 16:50, Stephan Ewen <[hidden email]> wrote:

> Hi!
>
> To follow up on what Ufuk explaned:
>
> - Ufuk is right, the problem is not getting the data set.
> https://github.com/apache/flink/pull/210 does that for anything that is not
> too gigantic, which is a good start. I think we should merge this as soon
> as we agree on the signature and names of the API methods. We can swap the
> internal realization for something more robust later.
>
> - For anything that just issues a program and wants the result back, this
> is actually perfectly fine.
>
> - For true interactive programs, we need to back track to intermediate
> results (rather than to the source) to avoid re-executing large parts. This
> is the biggest missing piece, next to the persistent materialization of
> intermediate results (Ufuk is working on this). The logic is the same as
> for fault tolerance, so it is part of that development.
>
> @alexander: I want to create the feature branch for that on Thursday. Are
> you interested in contributing to that feature?
>
> - For streaming results continuously back, we need another mechanism than
> the accumulators. Let's create a design doc or thread an get working on
> that. Probably involves adding another set of akka messages from TM -> JM
> -> Client. Or something like an extension to the BLOB manager for streams?

For streaming results back, we can use the same mechanisms used by the task managers. Let me add documentation (FLINK-1373) for the network stack this week.
Reply | Threaded
Open this post in threaded view
|

Re: Gather a distributed dataset

Stephan Ewen
@Alex That sounds great. I added a few inline comments to PR 210 and then
it is good to merge. If you want, feel free to fix it up and we will merge
it.

Feel free to also add (or suggest and stub) more of such functions. Is that
what you meant by "designing interfaces" ?

Here is a thought that crossed my mind:
 - Should functions like reduce() and aggregate() (in their ungrouped
version) produce a "SingleValuedDataSet" (or ScalarDataSet) that is known
to have only a single value? That data set could offer an additional method
"get()" that directly grabs that value (rather then collect() getting a
list).

Stephan




On Thu, Jan 15, 2015 at 11:30 AM, Ufuk Celebi <[hidden email]> wrote:

>
> On 13 Jan 2015, at 16:50, Stephan Ewen <[hidden email]> wrote:
>
> > Hi!
> >
> > To follow up on what Ufuk explaned:
> >
> > - Ufuk is right, the problem is not getting the data set.
> > https://github.com/apache/flink/pull/210 does that for anything that is
> not
> > too gigantic, which is a good start. I think we should merge this as soon
> > as we agree on the signature and names of the API methods. We can swap
> the
> > internal realization for something more robust later.
> >
> > - For anything that just issues a program and wants the result back, this
> > is actually perfectly fine.
> >
> > - For true interactive programs, we need to back track to intermediate
> > results (rather than to the source) to avoid re-executing large parts.
> This
> > is the biggest missing piece, next to the persistent materialization of
> > intermediate results (Ufuk is working on this). The logic is the same as
> > for fault tolerance, so it is part of that development.
> >
> > @alexander: I want to create the feature branch for that on Thursday. Are
> > you interested in contributing to that feature?
> >
> > - For streaming results continuously back, we need another mechanism than
> > the accumulators. Let's create a design doc or thread an get working on
> > that. Probably involves adding another set of akka messages from TM -> JM
> > -> Client. Or something like an extension to the BLOB manager for
> streams?
>
> For streaming results back, we can use the same mechanisms used by the
> task managers. Let me add documentation (FLINK-1373) for the network stack
> this week.
Reply | Threaded
Open this post in threaded view
|

Re: Gather a distributed dataset

aalexandrov
Thanks, I will have a look at your comments tomorrow and create a PR which
should superseed 210. BTW, is there already a test case where I can see the
suggested way to do staged execution in with the new ExecutionEnvironment
API?

I thought about your second remark as well. The following lines pitch
summarize some of these thoughts and can (or cannot) be used for future
improvements in the API and the runtime design.

Ideally, a runtime for parallel collection processing should support at
least two types of values:

- collections of type DataSet[T]; these are consumed and produced by the
parallel dataflows;
- "simple types" of any type either T; these are used to represent
"broadcast variables" as well as results for global aggregates;

By "simple types" I don't mean that T should be a scalar -- it could also
be a complex type like a POJO or Typle; it rather entails that the system
does not "understand" and cannot make use of its internal type structure;

At the moment, the runtime only supports values of type DataSet[T]. This is
inconvenient because

- you have to wrap simple typed values into a DataSet[T] in order to expose
them to your UDFs as broadcast variables; this is not visible to the user
but makes for some confusing code in the internals;
- global aggregates produce a value of type DataSet[T] rather than T; This
is inconsistent with the result type of the fold operator (and its
variants) which can be seen in other programming languages;

I think that the ideas in your email go in that direction. I suggest to
have the following hierarchy of types:

- Value[T] - an abstract base for all values
- Singleton[T] extends Value[T] - a container for exactly one value of type
T
- DataSet[T] extends Value[T] - a container for a (parallelizable)
homogeneous collection of values of type T

We should then rethink which of the runtime operators can consume or
produce both value types and which can only consume/produce a Singleton[T]
or a DataSet[T] adapt their signatures accordingly.

2015-01-16 15:24 GMT+01:00 Stephan Ewen <[hidden email]>:

> @Alex That sounds great. I added a few inline comments to PR 210 and then
> it is good to merge. If you want, feel free to fix it up and we will merge
> it.
>
> Feel free to also add (or suggest and stub) more of such functions. Is that
> what you meant by "designing interfaces" ?
>
> Here is a thought that crossed my mind:
>  - Should functions like reduce() and aggregate() (in their ungrouped
> version) produce a "SingleValuedDataSet" (or ScalarDataSet) that is known
> to have only a single value? That data set could offer an additional method
> "get()" that directly grabs that value (rather then collect() getting a
> list).
>
> Stephan
>
>
>
>
> On Thu, Jan 15, 2015 at 11:30 AM, Ufuk Celebi <[hidden email]> wrote:
>
> >
> > On 13 Jan 2015, at 16:50, Stephan Ewen <[hidden email]> wrote:
> >
> > > Hi!
> > >
> > > To follow up on what Ufuk explaned:
> > >
> > > - Ufuk is right, the problem is not getting the data set.
> > > https://github.com/apache/flink/pull/210 does that for anything that
> is
> > not
> > > too gigantic, which is a good start. I think we should merge this as
> soon
> > > as we agree on the signature and names of the API methods. We can swap
> > the
> > > internal realization for something more robust later.
> > >
> > > - For anything that just issues a program and wants the result back,
> this
> > > is actually perfectly fine.
> > >
> > > - For true interactive programs, we need to back track to intermediate
> > > results (rather than to the source) to avoid re-executing large parts.
> > This
> > > is the biggest missing piece, next to the persistent materialization of
> > > intermediate results (Ufuk is working on this). The logic is the same
> as
> > > for fault tolerance, so it is part of that development.
> > >
> > > @alexander: I want to create the feature branch for that on Thursday.
> Are
> > > you interested in contributing to that feature?
> > >
> > > - For streaming results continuously back, we need another mechanism
> than
> > > the accumulators. Let's create a design doc or thread an get working on
> > > that. Probably involves adding another set of akka messages from TM ->
> JM
> > > -> Client. Or something like an extension to the BLOB manager for
> > streams?
> >
> > For streaming results back, we can use the same mechanisms used by the
> > task managers. Let me add documentation (FLINK-1373) for the network
> stack
> > this week.
>
Reply | Threaded
Open this post in threaded view
|

RE: Gather a distributed dataset

Kruse, Sebastian
In reply to this post by Ufuk Celebi-2
Hi everyone,

I just wanted to give you the pointer FLINK-1038 https://github.com/apache/flink/pull/94
This is an output format that can send DataSet contents via Java RMI to, e.g., the driver. I am currently using it a lot and it seems to scale pretty well.

Cheers,
Sebastian

-----Original Message-----
From: Ufuk Celebi [mailto:[hidden email]]
Sent: Montag, 12. Januar 2015 12:06
To: [hidden email]
Subject: Re: Gather a distributed dataset

Hey Alexander,

On 12 Jan 2015, at 11:42, Alexander Alexandrov <[hidden email]> wrote:

> Hi there,
>
> I wished for intermediate datasets, and Santa Ufuk made my wishes come
> true (thank you, Santa)!
>
> Now that FLINK-986 is in the mainline, I want to ask some practical
> questions.
>
> In Spark, there is a way to put a value from the local driver to the
> distributed runtime via
>
> val x = env.parallelize(...) // expose x to the distributed runtime
> val y = dataflow(env, x) // y is produced by a dataflow which reads
> from x
>
> and also to get a value from the distributed runtime back to the
> driver
>
> val z = env.collect("y")
>
> As far as I know, in Flink we have an equivalent for parallelize
>
> val x = env.fromCollection(...)
>
> but not for collect. Is this still the case?

Yes, but this will change soon.

> If yes, how hard would it be to add this feature at the moment? Can
> you give me some pointers?

There is a "alpha" version/hack of this using accumulators. See https://github.com/apache/flink/pull/210. The problem is that each collect call results in a new program being executed from the sources. I think Stephan is working on the scheduling to support this kind of programs. From the runtime perspective, it is not a problem to transfer the produced intermediate results back to the job manager. The job manager can basically use the same mechanism that the task managers use. Even the accumulator version should be fine as a initial version.