[DISCUSS] Support Interactive Programming in Flink Table API

classic Classic list List threaded Threaded
73 messages Options
1234
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Support Interactive Programming in Flink Table API

Becket Qin
Hi all,

As a few recent email threads have pointed out, it is a promising
opportunity to enhance Flink Table API in various aspects, including
functionality and ease of use among others. One of the scenarios where we
feel Flink could improve is interactive programming. To explain the issues
and facilitate the discussion on the solution, we put together the
following document with our proposal.

https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing

Feedback and comments are very welcome!

Thanks,

Jiangjie (Becket) Qin
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

xingcanc
Hi Becket,

Thanks for bringing this up! For a long time, the intermediate cache problem has always been a pain point of the Flink streaming model. As far as I know, it’s quite a block for iterate operations in batch-related libs such as Gelly and FlinkML.

Actually, there’s an old JIRA[1], aiming to solve the cache problem more “thoroughly”. Compared with your proposal, it makes the persistence in DataSet level, which also allows the internal operations based on the DataSet API to benefit.

I totally understand the importance of Table API, but just wonder whether we should consider this problem in a larger view, i.e., adding a `PersistentService` rather than a `TablePersistentService` (as described in the "Flink Services" section).

Thanks,
Xingcan

[1] https://issues.apache.org/jira/browse/FLINK-1730

> On Nov 20, 2018, at 8:56 AM, Becket Qin <[hidden email]> wrote:
>
> Hi all,
>
> As a few recent email threads have pointed out, it is a promising
> opportunity to enhance Flink Table API in various aspects, including
> functionality and ease of use among others. One of the scenarios where we
> feel Flink could improve is interactive programming. To explain the issues
> and facilitate the discussion on the solution, we put together the
> following document with our proposal.
>
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
>
> Feedback and comments are very welcome!
>
> Thanks,
>
> Jiangjie (Becket) Qin

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Weihua Jiang
In reply to this post by Becket Qin
Hi Becket,

The design is quite interesting and useful.

I have several questions about your design:
1. Shall we add some persistence level hint to cache() function for
different temperature data? E.g. IN_MEM, IN_DISK, etc, or HOTTEST, HOT,
WARM, COLD?
2. When will the corresponding cached data be cleaned, by some kind of GC?
Shall we add uncache() function to allow user manually delete the cached
data?
3.  Must the FlinkService be a running service or Flink will run the
service in TM?

Thanks
Weihua

Becket Qin <[hidden email]> 于2018年11月20日周二 下午9:56写道:

> Hi all,
>
> As a few recent email threads have pointed out, it is a promising
> opportunity to enhance Flink Table API in various aspects, including
> functionality and ease of use among others. One of the scenarios where we
> feel Flink could improve is interactive programming. To explain the issues
> and facilitate the discussion on the solution, we put together the
> following document with our proposal.
>
>
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
>
> Feedback and comments are very welcome!
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Becket Qin
In reply to this post by xingcanc
Hi Xingcan,

Thanks for the feedback.

Adding the cache to DataSet is useful. In fact, the current proposal does
not assume the "PersistService" can only be used by the Table. We can
always add DataSet.cache() and let it benefit from the underlying
persistency support. So it seems more of a wording issue. I'll clarify on
that. In this proposal we are trying to focus on Table API as it aligns
with other ongoing efforts at this point.

Regarding FLINK-1730, it looks that the actual difference is whether we put
the cache()/persist() method in a util class or within Table/DataSet
classes. Personally I prefer having the method with Table/DataSet classes.
It is more straightforward and concise so the users do not need to wonder
where the persist method is (or does it even exist). What do you think?

Thanks,

Jiangjie (Becket) Qin

On Wed, Nov 21, 2018 at 1:10 AM Xingcan Cui <[hidden email]> wrote:

> Hi Becket,
>
> Thanks for bringing this up! For a long time, the intermediate cache
> problem has always been a pain point of the Flink streaming model. As far
> as I know, it’s quite a block for iterate operations in batch-related libs
> such as Gelly and FlinkML.
>
> Actually, there’s an old JIRA[1], aiming to solve the cache problem more
> “thoroughly”. Compared with your proposal, it makes the persistence in
> DataSet level, which also allows the internal operations based on the
> DataSet API to benefit.
>
> I totally understand the importance of Table API, but just wonder whether
> we should consider this problem in a larger view, i.e., adding a
> `PersistentService` rather than a `TablePersistentService` (as described in
> the "Flink Services" section).
>
> Thanks,
> Xingcan
>
> [1] https://issues.apache.org/jira/browse/FLINK-1730
>
> > On Nov 20, 2018, at 8:56 AM, Becket Qin <[hidden email]> wrote:
> >
> > Hi all,
> >
> > As a few recent email threads have pointed out, it is a promising
> > opportunity to enhance Flink Table API in various aspects, including
> > functionality and ease of use among others. One of the scenarios where we
> > feel Flink could improve is interactive programming. To explain the
> issues
> > and facilitate the discussion on the solution, we put together the
> > following document with our proposal.
> >
> >
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> >
> > Feedback and comments are very welcome!
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Shaoxuan Wang
In reply to this post by xingcanc
Hi Xingcan,

Thanks for the comments. Yes, "cache/persistent the intermediate data" is
useful. It can bring benefit to many scenarios. But different scenarios may
have different ways to solve it. For instance, as I replied to
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Embracing-Table-API-in-Flink-ML-td25368.html,
I
expect FlinkML to be implemented on top of tableAPI in the near future. We
already have some ideas/prototypes about how to do the iterations on
tableAPI. Will share it to the DEV soon.

I am not sure what you mean by “more thoroughly”. If you are referring to
"more general”, I think the underlying implementation of our proposal can
indeed extend to other APIs. But for now we want to focus on the tableAPI,
as we see lots of the user interests on tableAPI as oppose to dataset. As
you may already read, our proposal basically consists of two parts, one of
which is the changes on the tableAPI, including the table.cache() and how
to hook the table/store service in the table environment. The other one is
to provide a table/store service interface, with which the user can
plug/config different table / storeService according to their own
environment. It is not difficult to implement the same functionality for
dataset as what we proposed.

Regards,
Shaoxuan


On Wed, Nov 21, 2018 at 1:10 AM Xingcan Cui <[hidden email]> wrote:

> Hi Becket,
>
> Thanks for bringing this up! For a long time, the intermediate cache
> problem has always been a pain point of the Flink streaming model. As far
> as I know, it’s quite a block for iterate operations in batch-related libs
> such as Gelly and FlinkML.
>
> Actually, there’s an old JIRA[1], aiming to solve the cache problem more
> “thoroughly”. Compared with your proposal, it makes the persistence in
> DataSet level, which also allows the internal operations based on the
> DataSet API to benefit.
> I totally understand the importance of Table API, but just wonder whether
> we should consider this problem in a larger view, i.e., adding a
> `PersistentService` rather than a `TablePersistentService` (as described in
> the "Flink Services" section).


> Thanks,
> Xingcan
>
> [1] https://issues.apache.org/jira/browse/FLINK-1730
>
> > On Nov 20, 2018, at 8:56 AM, Becket Qin <[hidden email]> wrote:
> >
> > Hi all,
> >
> > As a few recent email threads have pointed out, it is a promising
> > opportunity to enhance Flink Table API in various aspects, including
> > functionality and ease of use among others. One of the scenarios where we
> > feel Flink could improve is interactive programming. To explain the
> issues
> > and facilitate the discussion on the solution, we put together the
> > following document with our proposal.
> >
> >
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> >
> > Feedback and comments are very welcome!
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Becket Qin
In reply to this post by Weihua Jiang
Hi Weihua,

Thanks for the comments. These are great questions!

To answer question 1, I think it depends on what do we want from the cache
service. At this point, it is not quite clear to me whether Flink needs
different caching levels. For example, in Spark, the memory level caching
are mostly used for iteration. I kind of think it is a little ugly to ask
users to explicitly do cache() and uncache() when writing the iterations.
In Flink, the iteration is done more efficiently without requiring user
explicitly managing the cache. BTW, Table API does not have iteration
support at this point, but we have being working on this and will send a
design doc shortly.
Another consideration here is that if we allow pluggable temp table
services, those implementations may not be able to provide all levels of
caching, which will make the cache level a bit confusing.

WRT the cleanup of the temp tables. That is a great point. As of now, the
cleanup is done in the callback when the session exits, i.e. when the
application program finishes. This assumes that the caching service could
host all the cached tables created in the entire session. I agree that an
explicit uncache() could be useful, we should probably add that.

We haven't thought through the FlinkService API yet. A rough idea is that
there will be a ServiceDescriptor/ServiceConfig as the contract between
Flink and user defined service. The service could be configured to either
run in a standalone process or within TMs. That said, FlinkService itself
is probably a big topic and justifies a discussion thread on its own. In
this proposal, it only affects how the default caching service is launched,
we can always adapt that to the FlinkService API once that is nailed.

Thanks,

Jiangjie (Becket) Qin

On Wed, Nov 21, 2018 at 10:42 AM Weihua Jiang <[hidden email]>
wrote:

> Hi Becket,
>
> The design is quite interesting and useful.
>
> I have several questions about your design:
> 1. Shall we add some persistence level hint to cache() function for
> different temperature data? E.g. IN_MEM, IN_DISK, etc, or HOTTEST, HOT,
> WARM, COLD?
> 2. When will the corresponding cached data be cleaned, by some kind of GC?
> Shall we add uncache() function to allow user manually delete the cached
> data?
> 3.  Must the FlinkService be a running service or Flink will run the
> service in TM?
>
> Thanks
> Weihua
>
> Becket Qin <[hidden email]> 于2018年11月20日周二 下午9:56写道:
>
> > Hi all,
> >
> > As a few recent email threads have pointed out, it is a promising
> > opportunity to enhance Flink Table API in various aspects, including
> > functionality and ease of use among others. One of the scenarios where we
> > feel Flink could improve is interactive programming. To explain the
> issues
> > and facilitate the discussion on the solution, we put together the
> > following document with our proposal.
> >
> >
> >
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> >
> > Feedback and comments are very welcome!
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Ruidong Li
In reply to this post by xingcanc
Hi Becket,

I think the Flink Service is a good abstraction, with which we can easily
build Interactive Programing or some other features.
We might bring the concept of 'Session', then we can think of Flink
Services as system processes and user jobs as user processes, so the
management of life cycle need to be discussed.

Kind Regards
Xpray



Xingcan Cui <[hidden email]> 于2018年11月21日周三 上午1:10写道:

> Hi Becket,
>
> Thanks for bringing this up! For a long time, the intermediate cache
> problem has always been a pain point of the Flink streaming model. As far
> as I know, it’s quite a block for iterate operations in batch-related libs
> such as Gelly and FlinkML.
>
> Actually, there’s an old JIRA[1], aiming to solve the cache problem more
> “thoroughly”. Compared with your proposal, it makes the persistence in
> DataSet level, which also allows the internal operations based on the
> DataSet API to benefit.
>
> I totally understand the importance of Table API, but just wonder whether
> we should consider this problem in a larger view, i.e., adding a
> `PersistentService` rather than a `TablePersistentService` (as described in
> the "Flink Services" section).
>
> Thanks,
> Xingcan
>
> [1] https://issues.apache.org/jira/browse/FLINK-1730
>
> > On Nov 20, 2018, at 8:56 AM, Becket Qin <[hidden email]> wrote:
> >
> > Hi all,
> >
> > As a few recent email threads have pointed out, it is a promising
> > opportunity to enhance Flink Table API in various aspects, including
> > functionality and ease of use among others. One of the scenarios where we
> > feel Flink could improve is interactive programming. To explain the
> issues
> > and facilitate the discussion on the solution, we put together the
> > following document with our proposal.
> >
> >
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> >
> > Feedback and comments are very welcome!
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

xingcanc
Hi all,

Thanks for the replies.

@Becket I think whether putting the persist/cache methods in a separated util class or inside the DataSet/Table depends on what we want to introduce. The former one sounds more like a data storage component where users may even somehow get a stored DataSet/Table via an ID or something, whereas the latter one sounds only like a cache mechanism. I’m not quite sure what we really need, but either approach is acceptable to me.

@Shaoxuan Yes, maybe “generally” is a more accurate word here. As the TableAPI only works with row type records, I just wondered whether a cache for that can be generalized on arbitrary data types. Anyway, if contributions can be made to enhance the TableAPI and rebuild other libs on it, that’s not a problem. Another point is, as I replied to @Becket, whether we introduce only a cache mechanism or a data storage component. IMO, compared to data storage, the cache could be volatile, which means it only works for (possibly?) accelerating and doesn’t need to absolutely guarantee the existence of DataSets/Tables.

What do you think?

Best,
Xingcan

> On Nov 21, 2018, at 5:44 AM, Ruidong Li <[hidden email]> wrote:
>
> Hi Becket,
>
> I think the Flink Service is a good abstraction, with which we can easily
> build Interactive Programing or some other features.
> We might bring the concept of 'Session', then we can think of Flink
> Services as system processes and user jobs as user processes, so the
> management of life cycle need to be discussed.
>
> Kind Regards
> Xpray
>
>
>
> Xingcan Cui <[hidden email]> 于2018年11月21日周三 上午1:10写道:
>
>> Hi Becket,
>>
>> Thanks for bringing this up! For a long time, the intermediate cache
>> problem has always been a pain point of the Flink streaming model. As far
>> as I know, it’s quite a block for iterate operations in batch-related libs
>> such as Gelly and FlinkML.
>>
>> Actually, there’s an old JIRA[1], aiming to solve the cache problem more
>> “thoroughly”. Compared with your proposal, it makes the persistence in
>> DataSet level, which also allows the internal operations based on the
>> DataSet API to benefit.
>>
>> I totally understand the importance of Table API, but just wonder whether
>> we should consider this problem in a larger view, i.e., adding a
>> `PersistentService` rather than a `TablePersistentService` (as described in
>> the "Flink Services" section).
>>
>> Thanks,
>> Xingcan
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-1730
>>
>>> On Nov 20, 2018, at 8:56 AM, Becket Qin <[hidden email]> wrote:
>>>
>>> Hi all,
>>>
>>> As a few recent email threads have pointed out, it is a promising
>>> opportunity to enhance Flink Table API in various aspects, including
>>> functionality and ease of use among others. One of the scenarios where we
>>> feel Flink could improve is interactive programming. To explain the
>> issues
>>> and facilitate the discussion on the solution, we put together the
>>> following document with our proposal.
>>>
>>>
>> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
>>>
>>> Feedback and comments are very welcome!
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Shaoxuan Wang
Hi Xingcan,

I think you probably misunderstood our proposal. The proposed “cache()” API
basically infers the data is only available for its session, but not
forever available for other sessions to access. It will be cleaned when the
session exits. “cache” does not imply the underlying implementation only
utilizes the cache. Actually, the default implementation we proposed is
file system. In the future, we may want to extend a “persistent” interface
which allows the application really materializes the data in a QoS/lifetime
ensured storage.

I just left a comment and clarified this in the google doc. Feel free to
leave the comment in google doc if you have any further questions.

Regards,
Shaoxuan

On Thu, Nov 22, 2018 at 12:45 AM Xingcan Cui <[hidden email]> wrote:

> Hi all,
>
> Thanks for the replies.
>
> @Becket I think whether putting the persist/cache methods in a separated
> util class or inside the DataSet/Table depends on what we want to
> introduce. The former one sounds more like a data storage component where
> users may even somehow get a stored DataSet/Table via an ID or something,
> whereas the latter one sounds only like a cache mechanism. I’m not quite
> sure what we really need, but either approach is acceptable to me.
>
> @Shaoxuan Yes, maybe “generally” is a more accurate word here. As the
> TableAPI only works with row type records, I just wondered whether a cache
> for that can be generalized on arbitrary data types. Anyway, if
> contributions can be made to enhance the TableAPI and rebuild other libs on
> it, that’s not a problem. Another point is, as I replied to @Becket,
> whether we introduce only a cache mechanism or a data storage component.
> IMO, compared to data storage, the cache could be volatile, which means it
> only works for (possibly?) accelerating and doesn’t need to absolutely
> guarantee the existence of DataSets/Tables.
>
> What do you think?
>
> Best,
> Xingcan
>
> > On Nov 21, 2018, at 5:44 AM, Ruidong Li <[hidden email]> wrote:
> >
> > Hi Becket,
> >
> > I think the Flink Service is a good abstraction, with which we can easily
> > build Interactive Programing or some other features.
> > We might bring the concept of 'Session', then we can think of Flink
> > Services as system processes and user jobs as user processes, so the
> > management of life cycle need to be discussed.
> >
> > Kind Regards
> > Xpray
> >
> >
> >
> > Xingcan Cui <[hidden email]> 于2018年11月21日周三 上午1:10写道:
> >
> >> Hi Becket,
> >>
> >> Thanks for bringing this up! For a long time, the intermediate cache
> >> problem has always been a pain point of the Flink streaming model. As
> far
> >> as I know, it’s quite a block for iterate operations in batch-related
> libs
> >> such as Gelly and FlinkML.
> >>
> >> Actually, there’s an old JIRA[1], aiming to solve the cache problem more
> >> “thoroughly”. Compared with your proposal, it makes the persistence in
> >> DataSet level, which also allows the internal operations based on the
> >> DataSet API to benefit.
> >>
> >> I totally understand the importance of Table API, but just wonder
> whether
> >> we should consider this problem in a larger view, i.e., adding a
> >> `PersistentService` rather than a `TablePersistentService` (as
> described in
> >> the "Flink Services" section).
> >>
> >> Thanks,
> >> Xingcan
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-1730
> >>
> >>> On Nov 20, 2018, at 8:56 AM, Becket Qin <[hidden email]> wrote:
> >>>
> >>> Hi all,
> >>>
> >>> As a few recent email threads have pointed out, it is a promising
> >>> opportunity to enhance Flink Table API in various aspects, including
> >>> functionality and ease of use among others. One of the scenarios where
> we
> >>> feel Flink could improve is interactive programming. To explain the
> >> issues
> >>> and facilitate the discussion on the solution, we put together the
> >>> following document with our proposal.
> >>>
> >>>
> >>
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> >>>
> >>> Feedback and comments are very welcome!
> >>>
> >>> Thanks,
> >>>
> >>> Jiangjie (Becket) Qin
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Becket Qin
In reply to this post by xingcanc
Hi Xingcan,

These a great points. We are on the same page regarding potential
capabilities of the proposed changes. There are actually two main parts in
the proposal, the API and the underlying service. Both parts can be
extended in the future.

We made a few design choices when draft the doc to restrict the scope of
this proposal, yet leave room for future extensions. For example, we did
not specify the interface of the underlying TableService. This is because
in the future, we may not only use it as a caching service, but also a
unified storage with functions such as stream/batch storage with indexing,
columnar/row-oriented formatting, schema awareness, etc.

Similarly, WRT API changes, right now we are just adding a cache() method,
and the cached table is only available within the session (it won't be lost
before the session exits). We found it already solves most of our concerns.
We can always add a persist(String tableId) method in the future, and let
the table be accessible globally. But this may introduce a lot of
interesting questions such as what if the table names conflict? Should
there be a session group? What should the life cycle look like for such
tables? Again, we are trying to restrict the scope and leave such questions
to future discussions.

Thanks,

Jiangjie (Becket) Qin







On Thu, Nov 22, 2018 at 12:45 AM Xingcan Cui <[hidden email]> wrote:

> Hi all,
>
> Thanks for the replies.
>
> @Becket I think whether putting the persist/cache methods in a separated
> util class or inside the DataSet/Table depends on what we want to
> introduce. The former one sounds more like a data storage component where
> users may even somehow get a stored DataSet/Table via an ID or something,
> whereas the latter one sounds only like a cache mechanism. I’m not quite
> sure what we really need, but either approach is acceptable to me.
>
> @Shaoxuan Yes, maybe “generally” is a more accurate word here. As the
> TableAPI only works with row type records, I just wondered whether a cache
> for that can be generalized on arbitrary data types. Anyway, if
> contributions can be made to enhance the TableAPI and rebuild other libs on
> it, that’s not a problem. Another point is, as I replied to @Becket,
> whether we introduce only a cache mechanism or a data storage component.
> IMO, compared to data storage, the cache could be volatile, which means it
> only works for (possibly?) accelerating and doesn’t need to absolutely
> guarantee the existence of DataSets/Tables.
>
> What do you think?
>
> Best,
> Xingcan
>
> > On Nov 21, 2018, at 5:44 AM, Ruidong Li <[hidden email]> wrote:
> >
> > Hi Becket,
> >
> > I think the Flink Service is a good abstraction, with which we can easily
> > build Interactive Programing or some other features.
> > We might bring the concept of 'Session', then we can think of Flink
> > Services as system processes and user jobs as user processes, so the
> > management of life cycle need to be discussed.
> >
> > Kind Regards
> > Xpray
> >
> >
> >
> > Xingcan Cui <[hidden email]> 于2018年11月21日周三 上午1:10写道:
> >
> >> Hi Becket,
> >>
> >> Thanks for bringing this up! For a long time, the intermediate cache
> >> problem has always been a pain point of the Flink streaming model. As
> far
> >> as I know, it’s quite a block for iterate operations in batch-related
> libs
> >> such as Gelly and FlinkML.
> >>
> >> Actually, there’s an old JIRA[1], aiming to solve the cache problem more
> >> “thoroughly”. Compared with your proposal, it makes the persistence in
> >> DataSet level, which also allows the internal operations based on the
> >> DataSet API to benefit.
> >>
> >> I totally understand the importance of Table API, but just wonder
> whether
> >> we should consider this problem in a larger view, i.e., adding a
> >> `PersistentService` rather than a `TablePersistentService` (as
> described in
> >> the "Flink Services" section).
> >>
> >> Thanks,
> >> Xingcan
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-1730
> >>
> >>> On Nov 20, 2018, at 8:56 AM, Becket Qin <[hidden email]> wrote:
> >>>
> >>> Hi all,
> >>>
> >>> As a few recent email threads have pointed out, it is a promising
> >>> opportunity to enhance Flink Table API in various aspects, including
> >>> functionality and ease of use among others. One of the scenarios where
> we
> >>> feel Flink could improve is interactive programming. To explain the
> >> issues
> >>> and facilitate the discussion on the solution, we put together the
> >>> following document with our proposal.
> >>>
> >>>
> >>
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> >>>
> >>> Feedback and comments are very welcome!
> >>>
> >>> Thanks,
> >>>
> >>> Jiangjie (Becket) Qin
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

jincheng sun
In reply to this post by Becket Qin
Hi Jiangjie&Shaoxuan,

Thanks for initiating this great proposal!

Interactive Programming is very useful and user friendly in case of your
examples.
Moreover, especially when a business has to be executed in several stages
with dependencies,such as the pipeline of Flink ML, in order to utilize the
intermediate calculation results we have to submit a job by env.execute().

About the `cache()`  , I think is better to named `persist()`, And The
Flink framework determines whether we internally cache in memory or persist
to the storage system,Maybe save the data into state backend
(MemoryStateBackend or RocksDBStateBackend etc.)

BTW, from the points of my view in the future, support for streaming and
batch mode switching in the same job will also benefit in "Interactive
Programming",  I am looking forward to your JIRAs and FLIP!

Best,
Jincheng


Becket Qin <[hidden email]> 于2018年11月20日周二 下午9:56写道:

> Hi all,
>
> As a few recent email threads have pointed out, it is a promising
> opportunity to enhance Flink Table API in various aspects, including
> functionality and ease of use among others. One of the scenarios where we
> feel Flink could improve is interactive programming. To explain the issues
> and facilitate the discussion on the solution, we put together the
> following document with our proposal.
>
>
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
>
> Feedback and comments are very welcome!
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Xiaowei Jiang
Relying on a callback for the temp table for clean up is not very reliable.
There is no guarantee that it will be executed successfully. We may risk
leaks when that happens. I think that it's safer to have an association
between temp table and session id. So we can always clean up temp tables
which are no longer associated with any active sessions.

Regards,
Xiaowei

On Thu, Nov 22, 2018 at 12:55 PM jincheng sun <[hidden email]>
wrote:

> Hi Jiangjie&Shaoxuan,
>
> Thanks for initiating this great proposal!
>
> Interactive Programming is very useful and user friendly in case of your
> examples.
> Moreover, especially when a business has to be executed in several stages
> with dependencies,such as the pipeline of Flink ML, in order to utilize the
> intermediate calculation results we have to submit a job by env.execute().
>
> About the `cache()`  , I think is better to named `persist()`, And The
> Flink framework determines whether we internally cache in memory or persist
> to the storage system,Maybe save the data into state backend
> (MemoryStateBackend or RocksDBStateBackend etc.)
>
> BTW, from the points of my view in the future, support for streaming and
> batch mode switching in the same job will also benefit in "Interactive
> Programming",  I am looking forward to your JIRAs and FLIP!
>
> Best,
> Jincheng
>
>
> Becket Qin <[hidden email]> 于2018年11月20日周二 下午9:56写道:
>
> > Hi all,
> >
> > As a few recent email threads have pointed out, it is a promising
> > opportunity to enhance Flink Table API in various aspects, including
> > functionality and ease of use among others. One of the scenarios where we
> > feel Flink could improve is interactive programming. To explain the
> issues
> > and facilitate the discussion on the solution, we put together the
> > following document with our proposal.
> >
> >
> >
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> >
> > Feedback and comments are very welcome!
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

xingcanc
Hi all,

@Shaoxuan, I think the lifecycle or access domain are both orthogonal to the cache problem. Essentially, this may be the first time we plan to introduce another storage mechanism other than the state. Maybe it’s better to first draw a big picture and then concentrate on a specific part?

@Becket, yes, actually I am more concerned with the underlying service. This seems to be quite a major change to the existing codebase. As you claimed, the service should be extendible to support other components and we’d better discussed it in another thread.

All in all, I also eager to enjoy the more interactive Table API, in case of a general and flexible enough service mechanism.

Best,
Xingcan

> On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang <[hidden email]> wrote:
>
> Relying on a callback for the temp table for clean up is not very reliable.
> There is no guarantee that it will be executed successfully. We may risk
> leaks when that happens. I think that it's safer to have an association
> between temp table and session id. So we can always clean up temp tables
> which are no longer associated with any active sessions.
>
> Regards,
> Xiaowei
>
> On Thu, Nov 22, 2018 at 12:55 PM jincheng sun <[hidden email]>
> wrote:
>
>> Hi Jiangjie&Shaoxuan,
>>
>> Thanks for initiating this great proposal!
>>
>> Interactive Programming is very useful and user friendly in case of your
>> examples.
>> Moreover, especially when a business has to be executed in several stages
>> with dependencies,such as the pipeline of Flink ML, in order to utilize the
>> intermediate calculation results we have to submit a job by env.execute().
>>
>> About the `cache()`  , I think is better to named `persist()`, And The
>> Flink framework determines whether we internally cache in memory or persist
>> to the storage system,Maybe save the data into state backend
>> (MemoryStateBackend or RocksDBStateBackend etc.)
>>
>> BTW, from the points of my view in the future, support for streaming and
>> batch mode switching in the same job will also benefit in "Interactive
>> Programming",  I am looking forward to your JIRAs and FLIP!
>>
>> Best,
>> Jincheng
>>
>>
>> Becket Qin <[hidden email]> 于2018年11月20日周二 下午9:56写道:
>>
>>> Hi all,
>>>
>>> As a few recent email threads have pointed out, it is a promising
>>> opportunity to enhance Flink Table API in various aspects, including
>>> functionality and ease of use among others. One of the scenarios where we
>>> feel Flink could improve is interactive programming. To explain the
>> issues
>>> and facilitate the discussion on the solution, we put together the
>>> following document with our proposal.
>>>
>>>
>>>
>> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
>>>
>>> Feedback and comments are very welcome!
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Becket Qin
Re: Jincheng,

Thanks for the feedback. Regarding cache() v.s. persist(), personally I
find cache() to be more accurately describing the behavior, i.e. the Table
is cached for the session, but will be deleted after the session is closed.
persist() seems a little misleading as people might think the table will
still be there even after the session is gone.

Great point about mixing the batch and stream processing in the same job.
We should absolutely move towards that goal. I imagine that would be a huge
change across the board, including sources, operators and optimizations, to
name some. Likely we will need several separate in-depth discussions.

Thanks,

Jiangjie (Becket) Qin

On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui <[hidden email]> wrote:

> Hi all,
>
> @Shaoxuan, I think the lifecycle or access domain are both orthogonal to
> the cache problem. Essentially, this may be the first time we plan to
> introduce another storage mechanism other than the state. Maybe it’s better
> to first draw a big picture and then concentrate on a specific part?
>
> @Becket, yes, actually I am more concerned with the underlying service.
> This seems to be quite a major change to the existing codebase. As you
> claimed, the service should be extendible to support other components and
> we’d better discussed it in another thread.
>
> All in all, I also eager to enjoy the more interactive Table API, in case
> of a general and flexible enough service mechanism.
>
> Best,
> Xingcan
>
> > On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang <[hidden email]> wrote:
> >
> > Relying on a callback for the temp table for clean up is not very
> reliable.
> > There is no guarantee that it will be executed successfully. We may risk
> > leaks when that happens. I think that it's safer to have an association
> > between temp table and session id. So we can always clean up temp tables
> > which are no longer associated with any active sessions.
> >
> > Regards,
> > Xiaowei
> >
> > On Thu, Nov 22, 2018 at 12:55 PM jincheng sun <[hidden email]>
> > wrote:
> >
> >> Hi Jiangjie&Shaoxuan,
> >>
> >> Thanks for initiating this great proposal!
> >>
> >> Interactive Programming is very useful and user friendly in case of your
> >> examples.
> >> Moreover, especially when a business has to be executed in several
> stages
> >> with dependencies,such as the pipeline of Flink ML, in order to utilize
> the
> >> intermediate calculation results we have to submit a job by
> env.execute().
> >>
> >> About the `cache()`  , I think is better to named `persist()`, And The
> >> Flink framework determines whether we internally cache in memory or
> persist
> >> to the storage system,Maybe save the data into state backend
> >> (MemoryStateBackend or RocksDBStateBackend etc.)
> >>
> >> BTW, from the points of my view in the future, support for streaming and
> >> batch mode switching in the same job will also benefit in "Interactive
> >> Programming",  I am looking forward to your JIRAs and FLIP!
> >>
> >> Best,
> >> Jincheng
> >>
> >>
> >> Becket Qin <[hidden email]> 于2018年11月20日周二 下午9:56写道:
> >>
> >>> Hi all,
> >>>
> >>> As a few recent email threads have pointed out, it is a promising
> >>> opportunity to enhance Flink Table API in various aspects, including
> >>> functionality and ease of use among others. One of the scenarios where
> we
> >>> feel Flink could improve is interactive programming. To explain the
> >> issues
> >>> and facilitate the discussion on the solution, we put together the
> >>> following document with our proposal.
> >>>
> >>>
> >>>
> >>
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> >>>
> >>> Feedback and comments are very welcome!
> >>>
> >>> Thanks,
> >>>
> >>> Jiangjie (Becket) Qin
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

jincheng sun
Hi Jiangjie,

Thank you for the explanation about the name of `cache()`, I understand why
you designed this way!

Another idea is whether we can specify a lifecycle for data persistence?
For example, persist (LifeCycle.SESSION), so that the user is not worried
about data loss, and will clearly specify the time range for keeping time.
At the same time, if we want to expand, we can also share in a certain
group of session, for example: LifeCycle.SESSION_GROUP(...), I am not sure,
just an immature suggestion, for reference only!

Bests,
Jincheng

Becket Qin <[hidden email]> 于2018年11月23日周五 下午1:33写道:

> Re: Jincheng,
>
> Thanks for the feedback. Regarding cache() v.s. persist(), personally I
> find cache() to be more accurately describing the behavior, i.e. the Table
> is cached for the session, but will be deleted after the session is closed.
> persist() seems a little misleading as people might think the table will
> still be there even after the session is gone.
>
> Great point about mixing the batch and stream processing in the same job.
> We should absolutely move towards that goal. I imagine that would be a huge
> change across the board, including sources, operators and optimizations, to
> name some. Likely we will need several separate in-depth discussions.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui <[hidden email]> wrote:
>
> > Hi all,
> >
> > @Shaoxuan, I think the lifecycle or access domain are both orthogonal to
> > the cache problem. Essentially, this may be the first time we plan to
> > introduce another storage mechanism other than the state. Maybe it’s
> better
> > to first draw a big picture and then concentrate on a specific part?
> >
> > @Becket, yes, actually I am more concerned with the underlying service.
> > This seems to be quite a major change to the existing codebase. As you
> > claimed, the service should be extendible to support other components and
> > we’d better discussed it in another thread.
> >
> > All in all, I also eager to enjoy the more interactive Table API, in case
> > of a general and flexible enough service mechanism.
> >
> > Best,
> > Xingcan
> >
> > > On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang <[hidden email]>
> wrote:
> > >
> > > Relying on a callback for the temp table for clean up is not very
> > reliable.
> > > There is no guarantee that it will be executed successfully. We may
> risk
> > > leaks when that happens. I think that it's safer to have an association
> > > between temp table and session id. So we can always clean up temp
> tables
> > > which are no longer associated with any active sessions.
> > >
> > > Regards,
> > > Xiaowei
> > >
> > > On Thu, Nov 22, 2018 at 12:55 PM jincheng sun <
> [hidden email]>
> > > wrote:
> > >
> > >> Hi Jiangjie&Shaoxuan,
> > >>
> > >> Thanks for initiating this great proposal!
> > >>
> > >> Interactive Programming is very useful and user friendly in case of
> your
> > >> examples.
> > >> Moreover, especially when a business has to be executed in several
> > stages
> > >> with dependencies,such as the pipeline of Flink ML, in order to
> utilize
> > the
> > >> intermediate calculation results we have to submit a job by
> > env.execute().
> > >>
> > >> About the `cache()`  , I think is better to named `persist()`, And The
> > >> Flink framework determines whether we internally cache in memory or
> > persist
> > >> to the storage system,Maybe save the data into state backend
> > >> (MemoryStateBackend or RocksDBStateBackend etc.)
> > >>
> > >> BTW, from the points of my view in the future, support for streaming
> and
> > >> batch mode switching in the same job will also benefit in "Interactive
> > >> Programming",  I am looking forward to your JIRAs and FLIP!
> > >>
> > >> Best,
> > >> Jincheng
> > >>
> > >>
> > >> Becket Qin <[hidden email]> 于2018年11月20日周二 下午9:56写道:
> > >>
> > >>> Hi all,
> > >>>
> > >>> As a few recent email threads have pointed out, it is a promising
> > >>> opportunity to enhance Flink Table API in various aspects, including
> > >>> functionality and ease of use among others. One of the scenarios
> where
> > we
> > >>> feel Flink could improve is interactive programming. To explain the
> > >> issues
> > >>> and facilitate the discussion on the solution, we put together the
> > >>> following document with our proposal.
> > >>>
> > >>>
> > >>>
> > >>
> >
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> > >>>
> > >>> Feedback and comments are very welcome!
> > >>>
> > >>> Thanks,
> > >>>
> > >>> Jiangjie (Becket) Qin
> > >>>
> > >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Becket Qin
In reply to this post by Becket Qin
Hi Xiaowei,

Thanks for the comment. That is a valid point.

The callback is not only associated with a particular temp table. It is a
clean up logic provided by the user. The temp table to session ID mapping
is tracked internally. We also need to associate the callback with the
session lifecycle and make sure it will be invoked when the session exits,
whether normally or abnormally. We haven't decided how exactly that should
be done yet. Several options being explored are:
1. Invoke the callback the Yarn application session shutdown hook if there
is one. (probably the best option if available)
2. Put the logic into Yarn AM.
3. Launch a WatchDog service and let it heartbeat to the client. If the
client indicates the session is closed or the client goes away
accidentally, the cleanup service will just kick in.

In any case, the callback is unlikely to be invoked on the client side.

Thanks,

Jiangjie (Becket) Qin


On Fri, Nov 23, 2018 at 1:32 PM Becket Qin <[hidden email]> wrote:

> Re: Jincheng,
>
> Thanks for the feedback. Regarding cache() v.s. persist(), personally I
> find cache() to be more accurately describing the behavior, i.e. the Table
> is cached for the session, but will be deleted after the session is closed.
> persist() seems a little misleading as people might think the table will
> still be there even after the session is gone.
>
> Great point about mixing the batch and stream processing in the same job.
> We should absolutely move towards that goal. I imagine that would be a huge
> change across the board, including sources, operators and optimizations, to
> name some. Likely we will need several separate in-depth discussions.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui <[hidden email]> wrote:
>
>> Hi all,
>>
>> @Shaoxuan, I think the lifecycle or access domain are both orthogonal to
>> the cache problem. Essentially, this may be the first time we plan to
>> introduce another storage mechanism other than the state. Maybe it’s better
>> to first draw a big picture and then concentrate on a specific part?
>>
>> @Becket, yes, actually I am more concerned with the underlying service.
>> This seems to be quite a major change to the existing codebase. As you
>> claimed, the service should be extendible to support other components and
>> we’d better discussed it in another thread.
>>
>> All in all, I also eager to enjoy the more interactive Table API, in case
>> of a general and flexible enough service mechanism.
>>
>> Best,
>> Xingcan
>>
>> > On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang <[hidden email]> wrote:
>> >
>> > Relying on a callback for the temp table for clean up is not very
>> reliable.
>> > There is no guarantee that it will be executed successfully. We may risk
>> > leaks when that happens. I think that it's safer to have an association
>> > between temp table and session id. So we can always clean up temp tables
>> > which are no longer associated with any active sessions.
>> >
>> > Regards,
>> > Xiaowei
>> >
>> > On Thu, Nov 22, 2018 at 12:55 PM jincheng sun <[hidden email]
>> >
>> > wrote:
>> >
>> >> Hi Jiangjie&Shaoxuan,
>> >>
>> >> Thanks for initiating this great proposal!
>> >>
>> >> Interactive Programming is very useful and user friendly in case of
>> your
>> >> examples.
>> >> Moreover, especially when a business has to be executed in several
>> stages
>> >> with dependencies,such as the pipeline of Flink ML, in order to
>> utilize the
>> >> intermediate calculation results we have to submit a job by
>> env.execute().
>> >>
>> >> About the `cache()`  , I think is better to named `persist()`, And The
>> >> Flink framework determines whether we internally cache in memory or
>> persist
>> >> to the storage system,Maybe save the data into state backend
>> >> (MemoryStateBackend or RocksDBStateBackend etc.)
>> >>
>> >> BTW, from the points of my view in the future, support for streaming
>> and
>> >> batch mode switching in the same job will also benefit in "Interactive
>> >> Programming",  I am looking forward to your JIRAs and FLIP!
>> >>
>> >> Best,
>> >> Jincheng
>> >>
>> >>
>> >> Becket Qin <[hidden email]> 于2018年11月20日周二 下午9:56写道:
>> >>
>> >>> Hi all,
>> >>>
>> >>> As a few recent email threads have pointed out, it is a promising
>> >>> opportunity to enhance Flink Table API in various aspects, including
>> >>> functionality and ease of use among others. One of the scenarios
>> where we
>> >>> feel Flink could improve is interactive programming. To explain the
>> >> issues
>> >>> and facilitate the discussion on the solution, we put together the
>> >>> following document with our proposal.
>> >>>
>> >>>
>> >>>
>> >>
>> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
>> >>>
>> >>> Feedback and comments are very welcome!
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Jiangjie (Becket) Qin
>> >>>
>> >>
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Becket Qin
In reply to this post by jincheng sun
Thanks for the suggestion, Jincheng.

Yes, I think it makes sense to have a persist() with lifecycle/defined
scope. I just added a section in the future work for this.

 Thanks,

Jiangjie (Becket) Qin

On Fri, Nov 23, 2018 at 1:55 PM jincheng sun <[hidden email]>
wrote:

> Hi Jiangjie,
>
> Thank you for the explanation about the name of `cache()`, I understand why
> you designed this way!
>
> Another idea is whether we can specify a lifecycle for data persistence?
> For example, persist (LifeCycle.SESSION), so that the user is not worried
> about data loss, and will clearly specify the time range for keeping time.
> At the same time, if we want to expand, we can also share in a certain
> group of session, for example: LifeCycle.SESSION_GROUP(...), I am not sure,
> just an immature suggestion, for reference only!
>
> Bests,
> Jincheng
>
> Becket Qin <[hidden email]> 于2018年11月23日周五 下午1:33写道:
>
> > Re: Jincheng,
> >
> > Thanks for the feedback. Regarding cache() v.s. persist(), personally I
> > find cache() to be more accurately describing the behavior, i.e. the
> Table
> > is cached for the session, but will be deleted after the session is
> closed.
> > persist() seems a little misleading as people might think the table will
> > still be there even after the session is gone.
> >
> > Great point about mixing the batch and stream processing in the same job.
> > We should absolutely move towards that goal. I imagine that would be a
> huge
> > change across the board, including sources, operators and optimizations,
> to
> > name some. Likely we will need several separate in-depth discussions.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui <[hidden email]> wrote:
> >
> > > Hi all,
> > >
> > > @Shaoxuan, I think the lifecycle or access domain are both orthogonal
> to
> > > the cache problem. Essentially, this may be the first time we plan to
> > > introduce another storage mechanism other than the state. Maybe it’s
> > better
> > > to first draw a big picture and then concentrate on a specific part?
> > >
> > > @Becket, yes, actually I am more concerned with the underlying service.
> > > This seems to be quite a major change to the existing codebase. As you
> > > claimed, the service should be extendible to support other components
> and
> > > we’d better discussed it in another thread.
> > >
> > > All in all, I also eager to enjoy the more interactive Table API, in
> case
> > > of a general and flexible enough service mechanism.
> > >
> > > Best,
> > > Xingcan
> > >
> > > > On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang <[hidden email]>
> > wrote:
> > > >
> > > > Relying on a callback for the temp table for clean up is not very
> > > reliable.
> > > > There is no guarantee that it will be executed successfully. We may
> > risk
> > > > leaks when that happens. I think that it's safer to have an
> association
> > > > between temp table and session id. So we can always clean up temp
> > tables
> > > > which are no longer associated with any active sessions.
> > > >
> > > > Regards,
> > > > Xiaowei
> > > >
> > > > On Thu, Nov 22, 2018 at 12:55 PM jincheng sun <
> > [hidden email]>
> > > > wrote:
> > > >
> > > >> Hi Jiangjie&Shaoxuan,
> > > >>
> > > >> Thanks for initiating this great proposal!
> > > >>
> > > >> Interactive Programming is very useful and user friendly in case of
> > your
> > > >> examples.
> > > >> Moreover, especially when a business has to be executed in several
> > > stages
> > > >> with dependencies,such as the pipeline of Flink ML, in order to
> > utilize
> > > the
> > > >> intermediate calculation results we have to submit a job by
> > > env.execute().
> > > >>
> > > >> About the `cache()`  , I think is better to named `persist()`, And
> The
> > > >> Flink framework determines whether we internally cache in memory or
> > > persist
> > > >> to the storage system,Maybe save the data into state backend
> > > >> (MemoryStateBackend or RocksDBStateBackend etc.)
> > > >>
> > > >> BTW, from the points of my view in the future, support for streaming
> > and
> > > >> batch mode switching in the same job will also benefit in
> "Interactive
> > > >> Programming",  I am looking forward to your JIRAs and FLIP!
> > > >>
> > > >> Best,
> > > >> Jincheng
> > > >>
> > > >>
> > > >> Becket Qin <[hidden email]> 于2018年11月20日周二 下午9:56写道:
> > > >>
> > > >>> Hi all,
> > > >>>
> > > >>> As a few recent email threads have pointed out, it is a promising
> > > >>> opportunity to enhance Flink Table API in various aspects,
> including
> > > >>> functionality and ease of use among others. One of the scenarios
> > where
> > > we
> > > >>> feel Flink could improve is interactive programming. To explain the
> > > >> issues
> > > >>> and facilitate the discussion on the solution, we put together the
> > > >>> following document with our proposal.
> > > >>>
> > > >>>
> > > >>>
> > > >>
> > >
> >
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> > > >>>
> > > >>> Feedback and comments are very welcome!
> > > >>>
> > > >>> Thanks,
> > > >>>
> > > >>> Jiangjie (Becket) Qin
> > > >>>
> > > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Piotr Nowojski
Hi,

Interesting idea. I’m trying to understand the problem. Isn’t the `cache()` call an equivalent of writing data to a sink and later reading from it? Where this sink has a limited live scope/live time? And the sink could be implemented as in memory or a file sink?

If so, what’s the problem with creating a materialised view from a table “b” (from your document’s example) and reusing this materialised view later? Maybe we are lacking mechanisms to clean up materialised views (for example when current session finishes)? Maybe we need some syntactic sugar on top of it?  

Piotrek

> On 23 Nov 2018, at 07:21, Becket Qin <[hidden email]> wrote:
>
> Thanks for the suggestion, Jincheng.
>
> Yes, I think it makes sense to have a persist() with lifecycle/defined
> scope. I just added a section in the future work for this.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Nov 23, 2018 at 1:55 PM jincheng sun <[hidden email]>
> wrote:
>
>> Hi Jiangjie,
>>
>> Thank you for the explanation about the name of `cache()`, I understand why
>> you designed this way!
>>
>> Another idea is whether we can specify a lifecycle for data persistence?
>> For example, persist (LifeCycle.SESSION), so that the user is not worried
>> about data loss, and will clearly specify the time range for keeping time.
>> At the same time, if we want to expand, we can also share in a certain
>> group of session, for example: LifeCycle.SESSION_GROUP(...), I am not sure,
>> just an immature suggestion, for reference only!
>>
>> Bests,
>> Jincheng
>>
>> Becket Qin <[hidden email]> 于2018年11月23日周五 下午1:33写道:
>>
>>> Re: Jincheng,
>>>
>>> Thanks for the feedback. Regarding cache() v.s. persist(), personally I
>>> find cache() to be more accurately describing the behavior, i.e. the
>> Table
>>> is cached for the session, but will be deleted after the session is
>> closed.
>>> persist() seems a little misleading as people might think the table will
>>> still be there even after the session is gone.
>>>
>>> Great point about mixing the batch and stream processing in the same job.
>>> We should absolutely move towards that goal. I imagine that would be a
>> huge
>>> change across the board, including sources, operators and optimizations,
>> to
>>> name some. Likely we will need several separate in-depth discussions.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui <[hidden email]> wrote:
>>>
>>>> Hi all,
>>>>
>>>> @Shaoxuan, I think the lifecycle or access domain are both orthogonal
>> to
>>>> the cache problem. Essentially, this may be the first time we plan to
>>>> introduce another storage mechanism other than the state. Maybe it’s
>>> better
>>>> to first draw a big picture and then concentrate on a specific part?
>>>>
>>>> @Becket, yes, actually I am more concerned with the underlying service.
>>>> This seems to be quite a major change to the existing codebase. As you
>>>> claimed, the service should be extendible to support other components
>> and
>>>> we’d better discussed it in another thread.
>>>>
>>>> All in all, I also eager to enjoy the more interactive Table API, in
>> case
>>>> of a general and flexible enough service mechanism.
>>>>
>>>> Best,
>>>> Xingcan
>>>>
>>>>> On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang <[hidden email]>
>>> wrote:
>>>>>
>>>>> Relying on a callback for the temp table for clean up is not very
>>>> reliable.
>>>>> There is no guarantee that it will be executed successfully. We may
>>> risk
>>>>> leaks when that happens. I think that it's safer to have an
>> association
>>>>> between temp table and session id. So we can always clean up temp
>>> tables
>>>>> which are no longer associated with any active sessions.
>>>>>
>>>>> Regards,
>>>>> Xiaowei
>>>>>
>>>>> On Thu, Nov 22, 2018 at 12:55 PM jincheng sun <
>>> [hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi Jiangjie&Shaoxuan,
>>>>>>
>>>>>> Thanks for initiating this great proposal!
>>>>>>
>>>>>> Interactive Programming is very useful and user friendly in case of
>>> your
>>>>>> examples.
>>>>>> Moreover, especially when a business has to be executed in several
>>>> stages
>>>>>> with dependencies,such as the pipeline of Flink ML, in order to
>>> utilize
>>>> the
>>>>>> intermediate calculation results we have to submit a job by
>>>> env.execute().
>>>>>>
>>>>>> About the `cache()`  , I think is better to named `persist()`, And
>> The
>>>>>> Flink framework determines whether we internally cache in memory or
>>>> persist
>>>>>> to the storage system,Maybe save the data into state backend
>>>>>> (MemoryStateBackend or RocksDBStateBackend etc.)
>>>>>>
>>>>>> BTW, from the points of my view in the future, support for streaming
>>> and
>>>>>> batch mode switching in the same job will also benefit in
>> "Interactive
>>>>>> Programming",  I am looking forward to your JIRAs and FLIP!
>>>>>>
>>>>>> Best,
>>>>>> Jincheng
>>>>>>
>>>>>>
>>>>>> Becket Qin <[hidden email]> 于2018年11月20日周二 下午9:56写道:
>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> As a few recent email threads have pointed out, it is a promising
>>>>>>> opportunity to enhance Flink Table API in various aspects,
>> including
>>>>>>> functionality and ease of use among others. One of the scenarios
>>> where
>>>> we
>>>>>>> feel Flink could improve is interactive programming. To explain the
>>>>>> issues
>>>>>>> and facilitate the discussion on the solution, we put together the
>>>>>>> following document with our proposal.
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>
>>>
>> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
>>>>>>>
>>>>>>> Feedback and comments are very welcome!
>>>>>>>
>>>>>>> Thanks,
>>>>>>>
>>>>>>> Jiangjie (Becket) Qin
>>>>>>>
>>>>>>
>>>>
>>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Becket Qin
Hi Piotrek,

For the cache() method itself, yes, it is equivalent to creating a BUILT-IN
materialized view with a lifecycle. That functionality is missing today,
though. Not sure if I understand your question. Do you mean we already have
the functionality and just need a syntax sugar?

What's more interesting in the proposal is do we want to stop at creating
the materialized view? Or do we want to extend that in the future to a more
useful unified data store distributed with Flink? And do we want to have a
mechanism allow more flexible user job pattern with their own user defined
services. These considerations are much more architectural.

Thanks,

Jiangjie (Becket) Qin

On Fri, Nov 23, 2018 at 6:01 PM Piotr Nowojski <[hidden email]>
wrote:

> Hi,
>
> Interesting idea. I’m trying to understand the problem. Isn’t the
> `cache()` call an equivalent of writing data to a sink and later reading
> from it? Where this sink has a limited live scope/live time? And the sink
> could be implemented as in memory or a file sink?
>
> If so, what’s the problem with creating a materialised view from a table
> “b” (from your document’s example) and reusing this materialised view
> later? Maybe we are lacking mechanisms to clean up materialised views (for
> example when current session finishes)? Maybe we need some syntactic sugar
> on top of it?
>
> Piotrek
>
> > On 23 Nov 2018, at 07:21, Becket Qin <[hidden email]> wrote:
> >
> > Thanks for the suggestion, Jincheng.
> >
> > Yes, I think it makes sense to have a persist() with lifecycle/defined
> > scope. I just added a section in the future work for this.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Fri, Nov 23, 2018 at 1:55 PM jincheng sun <[hidden email]>
> > wrote:
> >
> >> Hi Jiangjie,
> >>
> >> Thank you for the explanation about the name of `cache()`, I understand
> why
> >> you designed this way!
> >>
> >> Another idea is whether we can specify a lifecycle for data persistence?
> >> For example, persist (LifeCycle.SESSION), so that the user is not
> worried
> >> about data loss, and will clearly specify the time range for keeping
> time.
> >> At the same time, if we want to expand, we can also share in a certain
> >> group of session, for example: LifeCycle.SESSION_GROUP(...), I am not
> sure,
> >> just an immature suggestion, for reference only!
> >>
> >> Bests,
> >> Jincheng
> >>
> >> Becket Qin <[hidden email]> 于2018年11月23日周五 下午1:33写道:
> >>
> >>> Re: Jincheng,
> >>>
> >>> Thanks for the feedback. Regarding cache() v.s. persist(), personally I
> >>> find cache() to be more accurately describing the behavior, i.e. the
> >> Table
> >>> is cached for the session, but will be deleted after the session is
> >> closed.
> >>> persist() seems a little misleading as people might think the table
> will
> >>> still be there even after the session is gone.
> >>>
> >>> Great point about mixing the batch and stream processing in the same
> job.
> >>> We should absolutely move towards that goal. I imagine that would be a
> >> huge
> >>> change across the board, including sources, operators and
> optimizations,
> >> to
> >>> name some. Likely we will need several separate in-depth discussions.
> >>>
> >>> Thanks,
> >>>
> >>> Jiangjie (Becket) Qin
> >>>
> >>> On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui <[hidden email]>
> wrote:
> >>>
> >>>> Hi all,
> >>>>
> >>>> @Shaoxuan, I think the lifecycle or access domain are both orthogonal
> >> to
> >>>> the cache problem. Essentially, this may be the first time we plan to
> >>>> introduce another storage mechanism other than the state. Maybe it’s
> >>> better
> >>>> to first draw a big picture and then concentrate on a specific part?
> >>>>
> >>>> @Becket, yes, actually I am more concerned with the underlying
> service.
> >>>> This seems to be quite a major change to the existing codebase. As you
> >>>> claimed, the service should be extendible to support other components
> >> and
> >>>> we’d better discussed it in another thread.
> >>>>
> >>>> All in all, I also eager to enjoy the more interactive Table API, in
> >> case
> >>>> of a general and flexible enough service mechanism.
> >>>>
> >>>> Best,
> >>>> Xingcan
> >>>>
> >>>>> On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang <[hidden email]>
> >>> wrote:
> >>>>>
> >>>>> Relying on a callback for the temp table for clean up is not very
> >>>> reliable.
> >>>>> There is no guarantee that it will be executed successfully. We may
> >>> risk
> >>>>> leaks when that happens. I think that it's safer to have an
> >> association
> >>>>> between temp table and session id. So we can always clean up temp
> >>> tables
> >>>>> which are no longer associated with any active sessions.
> >>>>>
> >>>>> Regards,
> >>>>> Xiaowei
> >>>>>
> >>>>> On Thu, Nov 22, 2018 at 12:55 PM jincheng sun <
> >>> [hidden email]>
> >>>>> wrote:
> >>>>>
> >>>>>> Hi Jiangjie&Shaoxuan,
> >>>>>>
> >>>>>> Thanks for initiating this great proposal!
> >>>>>>
> >>>>>> Interactive Programming is very useful and user friendly in case of
> >>> your
> >>>>>> examples.
> >>>>>> Moreover, especially when a business has to be executed in several
> >>>> stages
> >>>>>> with dependencies,such as the pipeline of Flink ML, in order to
> >>> utilize
> >>>> the
> >>>>>> intermediate calculation results we have to submit a job by
> >>>> env.execute().
> >>>>>>
> >>>>>> About the `cache()`  , I think is better to named `persist()`, And
> >> The
> >>>>>> Flink framework determines whether we internally cache in memory or
> >>>> persist
> >>>>>> to the storage system,Maybe save the data into state backend
> >>>>>> (MemoryStateBackend or RocksDBStateBackend etc.)
> >>>>>>
> >>>>>> BTW, from the points of my view in the future, support for streaming
> >>> and
> >>>>>> batch mode switching in the same job will also benefit in
> >> "Interactive
> >>>>>> Programming",  I am looking forward to your JIRAs and FLIP!
> >>>>>>
> >>>>>> Best,
> >>>>>> Jincheng
> >>>>>>
> >>>>>>
> >>>>>> Becket Qin <[hidden email]> 于2018年11月20日周二 下午9:56写道:
> >>>>>>
> >>>>>>> Hi all,
> >>>>>>>
> >>>>>>> As a few recent email threads have pointed out, it is a promising
> >>>>>>> opportunity to enhance Flink Table API in various aspects,
> >> including
> >>>>>>> functionality and ease of use among others. One of the scenarios
> >>> where
> >>>> we
> >>>>>>> feel Flink could improve is interactive programming. To explain the
> >>>>>> issues
> >>>>>>> and facilitate the discussion on the solution, we put together the
> >>>>>>> following document with our proposal.
> >>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>
> >>>
> >>
> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
> >>>>>>>
> >>>>>>> Feedback and comments are very welcome!
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>>
> >>>>>>> Jiangjie (Becket) Qin
> >>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support Interactive Programming in Flink Table API

Piotr Nowojski
Hi Becket,

Ops, sorry I didn’t notice that you intend to reuse existing `TableFactory`. I don’t know why, but I assumed that you want to provide an alternate way of writing the data.

Now that I hopefully understand the proposal, maybe we could rename `cache()` to

void materialize()

or going step further

MaterializedTable materialize()
MaterializedTable createMaterializedView()

?

The second option with returning a handle I think is more flexible and could provide features such as “refresh”/“delete” or generally speaking manage the the view. In the future we could also think about adding hooks to automatically refresh view etc. It is also more explicit - materialization returning a new table handle will not have the same implicit side effects as adding a simple line of code like `b.cache()` would have.

It would also be more SQL like, making it more intuitive for users already familiar with the SQL.

Piotrek

> On 23 Nov 2018, at 14:53, Becket Qin <[hidden email]> wrote:
>
> Hi Piotrek,
>
> For the cache() method itself, yes, it is equivalent to creating a BUILT-IN
> materialized view with a lifecycle. That functionality is missing today,
> though. Not sure if I understand your question. Do you mean we already have
> the functionality and just need a syntax sugar?
>
> What's more interesting in the proposal is do we want to stop at creating
> the materialized view? Or do we want to extend that in the future to a more
> useful unified data store distributed with Flink? And do we want to have a
> mechanism allow more flexible user job pattern with their own user defined
> services. These considerations are much more architectural.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Nov 23, 2018 at 6:01 PM Piotr Nowojski <[hidden email]>
> wrote:
>
>> Hi,
>>
>> Interesting idea. I’m trying to understand the problem. Isn’t the
>> `cache()` call an equivalent of writing data to a sink and later reading
>> from it? Where this sink has a limited live scope/live time? And the sink
>> could be implemented as in memory or a file sink?
>>
>> If so, what’s the problem with creating a materialised view from a table
>> “b” (from your document’s example) and reusing this materialised view
>> later? Maybe we are lacking mechanisms to clean up materialised views (for
>> example when current session finishes)? Maybe we need some syntactic sugar
>> on top of it?
>>
>> Piotrek
>>
>>> On 23 Nov 2018, at 07:21, Becket Qin <[hidden email]> wrote:
>>>
>>> Thanks for the suggestion, Jincheng.
>>>
>>> Yes, I think it makes sense to have a persist() with lifecycle/defined
>>> scope. I just added a section in the future work for this.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Fri, Nov 23, 2018 at 1:55 PM jincheng sun <[hidden email]>
>>> wrote:
>>>
>>>> Hi Jiangjie,
>>>>
>>>> Thank you for the explanation about the name of `cache()`, I understand
>> why
>>>> you designed this way!
>>>>
>>>> Another idea is whether we can specify a lifecycle for data persistence?
>>>> For example, persist (LifeCycle.SESSION), so that the user is not
>> worried
>>>> about data loss, and will clearly specify the time range for keeping
>> time.
>>>> At the same time, if we want to expand, we can also share in a certain
>>>> group of session, for example: LifeCycle.SESSION_GROUP(...), I am not
>> sure,
>>>> just an immature suggestion, for reference only!
>>>>
>>>> Bests,
>>>> Jincheng
>>>>
>>>> Becket Qin <[hidden email]> 于2018年11月23日周五 下午1:33写道:
>>>>
>>>>> Re: Jincheng,
>>>>>
>>>>> Thanks for the feedback. Regarding cache() v.s. persist(), personally I
>>>>> find cache() to be more accurately describing the behavior, i.e. the
>>>> Table
>>>>> is cached for the session, but will be deleted after the session is
>>>> closed.
>>>>> persist() seems a little misleading as people might think the table
>> will
>>>>> still be there even after the session is gone.
>>>>>
>>>>> Great point about mixing the batch and stream processing in the same
>> job.
>>>>> We should absolutely move towards that goal. I imagine that would be a
>>>> huge
>>>>> change across the board, including sources, operators and
>> optimizations,
>>>> to
>>>>> name some. Likely we will need several separate in-depth discussions.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>> On Fri, Nov 23, 2018 at 5:14 AM Xingcan Cui <[hidden email]>
>> wrote:
>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> @Shaoxuan, I think the lifecycle or access domain are both orthogonal
>>>> to
>>>>>> the cache problem. Essentially, this may be the first time we plan to
>>>>>> introduce another storage mechanism other than the state. Maybe it’s
>>>>> better
>>>>>> to first draw a big picture and then concentrate on a specific part?
>>>>>>
>>>>>> @Becket, yes, actually I am more concerned with the underlying
>> service.
>>>>>> This seems to be quite a major change to the existing codebase. As you
>>>>>> claimed, the service should be extendible to support other components
>>>> and
>>>>>> we’d better discussed it in another thread.
>>>>>>
>>>>>> All in all, I also eager to enjoy the more interactive Table API, in
>>>> case
>>>>>> of a general and flexible enough service mechanism.
>>>>>>
>>>>>> Best,
>>>>>> Xingcan
>>>>>>
>>>>>>> On Nov 22, 2018, at 10:16 AM, Xiaowei Jiang <[hidden email]>
>>>>> wrote:
>>>>>>>
>>>>>>> Relying on a callback for the temp table for clean up is not very
>>>>>> reliable.
>>>>>>> There is no guarantee that it will be executed successfully. We may
>>>>> risk
>>>>>>> leaks when that happens. I think that it's safer to have an
>>>> association
>>>>>>> between temp table and session id. So we can always clean up temp
>>>>> tables
>>>>>>> which are no longer associated with any active sessions.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Xiaowei
>>>>>>>
>>>>>>> On Thu, Nov 22, 2018 at 12:55 PM jincheng sun <
>>>>> [hidden email]>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Hi Jiangjie&Shaoxuan,
>>>>>>>>
>>>>>>>> Thanks for initiating this great proposal!
>>>>>>>>
>>>>>>>> Interactive Programming is very useful and user friendly in case of
>>>>> your
>>>>>>>> examples.
>>>>>>>> Moreover, especially when a business has to be executed in several
>>>>>> stages
>>>>>>>> with dependencies,such as the pipeline of Flink ML, in order to
>>>>> utilize
>>>>>> the
>>>>>>>> intermediate calculation results we have to submit a job by
>>>>>> env.execute().
>>>>>>>>
>>>>>>>> About the `cache()`  , I think is better to named `persist()`, And
>>>> The
>>>>>>>> Flink framework determines whether we internally cache in memory or
>>>>>> persist
>>>>>>>> to the storage system,Maybe save the data into state backend
>>>>>>>> (MemoryStateBackend or RocksDBStateBackend etc.)
>>>>>>>>
>>>>>>>> BTW, from the points of my view in the future, support for streaming
>>>>> and
>>>>>>>> batch mode switching in the same job will also benefit in
>>>> "Interactive
>>>>>>>> Programming",  I am looking forward to your JIRAs and FLIP!
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jincheng
>>>>>>>>
>>>>>>>>
>>>>>>>> Becket Qin <[hidden email]> 于2018年11月20日周二 下午9:56写道:
>>>>>>>>
>>>>>>>>> Hi all,
>>>>>>>>>
>>>>>>>>> As a few recent email threads have pointed out, it is a promising
>>>>>>>>> opportunity to enhance Flink Table API in various aspects,
>>>> including
>>>>>>>>> functionality and ease of use among others. One of the scenarios
>>>>> where
>>>>>> we
>>>>>>>>> feel Flink could improve is interactive programming. To explain the
>>>>>>>> issues
>>>>>>>>> and facilitate the discussion on the solution, we put together the
>>>>>>>>> following document with our proposal.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>>>>
>> https://docs.google.com/document/d/1d4T2zTyfe7hdncEUAxrlNOYr4e5IMNEZLyqSuuswkA0/edit?usp=sharing
>>>>>>>>>
>>>>>>>>> Feedback and comments are very welcome!
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>
>>

1234