[DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

classic Classic list List threaded Threaded
27 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi folks,

I'd like to start the discussion about FLIP-36 Support Interactive
Programming in Flink Table API
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink

The FLIP proposes to add support for interactive programming in Flink Table
API. Specifically, it let users cache the intermediate results(tables) and
use them in the later jobs.

Even though the FLIP has been discussed in the past[1], the FLIP hasn't
formally passed the vote yet. And some of the design and implementation
detail have to change to incorporates the cluster partition proposed in
FLIP-67[2].

Looking forward to your feedback.

Thanks,
Xuannan

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
[2]
https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Becket Qin
Hi Xuannan,

Thanks for picking up the FLIP. It looks good to me overall. Some quick
comments / questions below:

1. Do we also need changes in the Java API?
2. What are the cases that users may want to retry reading the intermediate
result? It seems that once the intermediate result has gone, it will not be
available later without being generated again, right?
3. In the "semantic of cache() method" section, the description "The
semantic of the *cache() *method is a little different depending on whether
auto caching is enabled or not." seems not explained.

Thanks,

Jiangjie (Becket) Qin



On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <[hidden email]> wrote:

> Hi folks,
>
> I'd like to start the discussion about FLIP-36 Support Interactive
> Programming in Flink Table API
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>
> The FLIP proposes to add support for interactive programming in Flink Table
> API. Specifically, it let users cache the intermediate results(tables) and
> use them in the later jobs.
>
> Even though the FLIP has been discussed in the past[1], the FLIP hasn't
> formally passed the vote yet. And some of the design and implementation
> detail have to change to incorporates the cluster partition proposed in
> FLIP-67[2].
>
> Looking forward to your feedback.
>
> Thanks,
> Xuannan
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> [2]
>
> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi Becket,

Thanks for the comments.

On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <[hidden email]> wrote:

> Hi Xuannan,
>
> Thanks for picking up the FLIP. It looks good to me overall. Some quick
> comments / questions below:
>
> 1. Do we also need changes in the Java API?
>

Yes, the public interface of Table and TableEnvironment should be made in
the Java API.


> 2. What are the cases that users may want to retry reading the intermediate
> result? It seems that once the intermediate result has gone, it will not be
> available later without being generated again, right?
>

 I am not entirely sure if I understand the cases you mentioned. The users
can use the cached table object returned by the .cache() method in other
job and it should read the intermediate result. The intermediate result can
gone in the following three cases: 1. the user explicitly call the
invalidateCache() method 2. the TableEnvironment is closed 3. failure
happens on the TM. When that happens, the intermeidate result will not be
available unless it is re-generated.

3. In the "semantic of cache() method" section, the description "The
> semantic of the *cache() *method is a little different depending on whether
> auto caching is enabled or not." seems not explained.
>

This line is actually outdated and should be removed, as we are not adding
the auto caching functionality in this FLIP. Auto caching will be added in
the future, and the semantic of cache() when auto caching is enabled will
be discussed in detail by a new FLIP. I will remove the descriptor to avoid
further confusion.


> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <[hidden email]> wrote:
>
> > Hi folks,
> >
> > I'd like to start the discussion about FLIP-36 Support Interactive
> > Programming in Flink Table API
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> >
> > The FLIP proposes to add support for interactive programming in Flink
> Table
> > API. Specifically, it let users cache the intermediate results(tables)
> and
> > use them in the later jobs.
> >
> > Even though the FLIP has been discussed in the past[1], the FLIP hasn't
> > formally passed the vote yet. And some of the design and implementation
> > detail have to change to incorporates the cluster partition proposed in
> > FLIP-67[2].
> >
> > Looking forward to your feedback.
> >
> > Thanks,
> > Xuannan
> >
> > [1]
> >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > [2]
> >
> >
> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Becket Qin
Hi Xuannan,

 I am not entirely sure if I understand the cases you mentioned. The users
> can use the cached table object returned by the .cache() method in other
> job and it should read the intermediate result. The intermediate result can
> gone in the following three cases: 1. the user explicitly call the
> invalidateCache() method 2. the TableEnvironment is closed 3. failure
> happens on the TM. When that happens, the intermeidate result will not be
> available unless it is re-generated.


What confused me was that why do we need to have a *cache.retries.max *config?
Shouldn't the missing intermediate result always be automatically
re-generated if it is gone?

Thanks,

Jiangjie (Becket) Qin


On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <[hidden email]> wrote:

> Hi Becket,
>
> Thanks for the comments.
>
> On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <[hidden email]> wrote:
>
> > Hi Xuannan,
> >
> > Thanks for picking up the FLIP. It looks good to me overall. Some quick
> > comments / questions below:
> >
> > 1. Do we also need changes in the Java API?
> >
>
> Yes, the public interface of Table and TableEnvironment should be made in
> the Java API.
>
>
> > 2. What are the cases that users may want to retry reading the
> intermediate
> > result? It seems that once the intermediate result has gone, it will not
> be
> > available later without being generated again, right?
> >
>
>  I am not entirely sure if I understand the cases you mentioned. The users
> can use the cached table object returned by the .cache() method in other
> job and it should read the intermediate result. The intermediate result can
> gone in the following three cases: 1. the user explicitly call the
> invalidateCache() method 2. the TableEnvironment is closed 3. failure
> happens on the TM. When that happens, the intermeidate result will not be
> available unless it is re-generated.
>
> 3. In the "semantic of cache() method" section, the description "The
> > semantic of the *cache() *method is a little different depending on
> whether
> > auto caching is enabled or not." seems not explained.
> >
>
> This line is actually outdated and should be removed, as we are not adding
> the auto caching functionality in this FLIP. Auto caching will be added in
> the future, and the semantic of cache() when auto caching is enabled will
> be discussed in detail by a new FLIP. I will remove the descriptor to avoid
> further confusion.
>
>
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <[hidden email]>
> wrote:
> >
> > > Hi folks,
> > >
> > > I'd like to start the discussion about FLIP-36 Support Interactive
> > > Programming in Flink Table API
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > >
> > > The FLIP proposes to add support for interactive programming in Flink
> > Table
> > > API. Specifically, it let users cache the intermediate results(tables)
> > and
> > > use them in the later jobs.
> > >
> > > Even though the FLIP has been discussed in the past[1], the FLIP hasn't
> > > formally passed the vote yet. And some of the design and implementation
> > > detail have to change to incorporates the cluster partition proposed in
> > > FLIP-67[2].
> > >
> > > Looking forward to your feedback.
> > >
> > > Thanks,
> > > Xuannan
> > >
> > > [1]
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > [2]
> > >
> > >
> >
> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi Becket,

The intermediate result will indeed be automatically re-generated by
resubmitting the original DAG. And that job could fail as well. In that
case, we need to decide if we should resubmit the original DAG to
re-generate the intermediate result or give up and throw an exception to
the user. And the config is to indicate how many resubmit should happen
before giving up.

Thanks,
Xuannan

On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <[hidden email]> wrote:

> Hi Xuannan,
>
>  I am not entirely sure if I understand the cases you mentioned. The users
> > can use the cached table object returned by the .cache() method in other
> > job and it should read the intermediate result. The intermediate result
> can
> > gone in the following three cases: 1. the user explicitly call the
> > invalidateCache() method 2. the TableEnvironment is closed 3. failure
> > happens on the TM. When that happens, the intermeidate result will not be
> > available unless it is re-generated.
>
>
> What confused me was that why do we need to have a *cache.retries.max
> *config?
> Shouldn't the missing intermediate result always be automatically
> re-generated if it is gone?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
> On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <[hidden email]> wrote:
>
> > Hi Becket,
> >
> > Thanks for the comments.
> >
> > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <[hidden email]> wrote:
> >
> > > Hi Xuannan,
> > >
> > > Thanks for picking up the FLIP. It looks good to me overall. Some quick
> > > comments / questions below:
> > >
> > > 1. Do we also need changes in the Java API?
> > >
> >
> > Yes, the public interface of Table and TableEnvironment should be made in
> > the Java API.
> >
> >
> > > 2. What are the cases that users may want to retry reading the
> > intermediate
> > > result? It seems that once the intermediate result has gone, it will
> not
> > be
> > > available later without being generated again, right?
> > >
> >
> >  I am not entirely sure if I understand the cases you mentioned. The
> users
> > can use the cached table object returned by the .cache() method in other
> > job and it should read the intermediate result. The intermediate result
> can
> > gone in the following three cases: 1. the user explicitly call the
> > invalidateCache() method 2. the TableEnvironment is closed 3. failure
> > happens on the TM. When that happens, the intermeidate result will not be
> > available unless it is re-generated.
> >
> > 3. In the "semantic of cache() method" section, the description "The
> > > semantic of the *cache() *method is a little different depending on
> > whether
> > > auto caching is enabled or not." seems not explained.
> > >
> >
> > This line is actually outdated and should be removed, as we are not
> adding
> > the auto caching functionality in this FLIP. Auto caching will be added
> in
> > the future, and the semantic of cache() when auto caching is enabled will
> > be discussed in detail by a new FLIP. I will remove the descriptor to
> avoid
> > further confusion.
> >
> >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > >
> > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <[hidden email]>
> > wrote:
> > >
> > > > Hi folks,
> > > >
> > > > I'd like to start the discussion about FLIP-36 Support Interactive
> > > > Programming in Flink Table API
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > >
> > > > The FLIP proposes to add support for interactive programming in Flink
> > > Table
> > > > API. Specifically, it let users cache the intermediate
> results(tables)
> > > and
> > > > use them in the later jobs.
> > > >
> > > > Even though the FLIP has been discussed in the past[1], the FLIP
> hasn't
> > > > formally passed the vote yet. And some of the design and
> implementation
> > > > detail have to change to incorporates the cluster partition proposed
> in
> > > > FLIP-67[2].
> > > >
> > > > Looking forward to your feedback.
> > > >
> > > > Thanks,
> > > > Xuannan
> > > >
> > > > [1]
> > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > > [2]
> > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Becket Qin
Hi Xuannan,

If user submits Job 1 and generated a cached intermediate result. And later
on, user submitted job 2 which should ideally use the intermediate result.
In that case, if job 2 failed due to missing the intermediate result, Job 2
should be retried with its full DAG. After that when Job 2 runs, it will
also re-generate the cache. However, once job 2 has fell back to the
original DAG, should it just be treated as an ordinary job that follow the
recovery strategy? Having a separate configuration seems a little
confusing. In another word, re-generating the cache is just a byproduct of
running the full DAG of job 2, but is not the main purpose. It is just like
when job 1 runs to generate cache, it does not have a separate config of
retry to make sure the cache is generated. If it fails, it just fail like
an ordinary job.

What do you think?

Thanks,

Jiangjie (Becket) Qin

On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <[hidden email]> wrote:

> Hi Becket,
>
> The intermediate result will indeed be automatically re-generated by
> resubmitting the original DAG. And that job could fail as well. In that
> case, we need to decide if we should resubmit the original DAG to
> re-generate the intermediate result or give up and throw an exception to
> the user. And the config is to indicate how many resubmit should happen
> before giving up.
>
> Thanks,
> Xuannan
>
> On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <[hidden email]> wrote:
>
> > Hi Xuannan,
> >
> >  I am not entirely sure if I understand the cases you mentioned. The
> users
> > > can use the cached table object returned by the .cache() method in
> other
> > > job and it should read the intermediate result. The intermediate result
> > can
> > > gone in the following three cases: 1. the user explicitly call the
> > > invalidateCache() method 2. the TableEnvironment is closed 3. failure
> > > happens on the TM. When that happens, the intermeidate result will not
> be
> > > available unless it is re-generated.
> >
> >
> > What confused me was that why do we need to have a *cache.retries.max
> > *config?
> > Shouldn't the missing intermediate result always be automatically
> > re-generated if it is gone?
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <[hidden email]>
> wrote:
> >
> > > Hi Becket,
> > >
> > > Thanks for the comments.
> > >
> > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <[hidden email]>
> wrote:
> > >
> > > > Hi Xuannan,
> > > >
> > > > Thanks for picking up the FLIP. It looks good to me overall. Some
> quick
> > > > comments / questions below:
> > > >
> > > > 1. Do we also need changes in the Java API?
> > > >
> > >
> > > Yes, the public interface of Table and TableEnvironment should be made
> in
> > > the Java API.
> > >
> > >
> > > > 2. What are the cases that users may want to retry reading the
> > > intermediate
> > > > result? It seems that once the intermediate result has gone, it will
> > not
> > > be
> > > > available later without being generated again, right?
> > > >
> > >
> > >  I am not entirely sure if I understand the cases you mentioned. The
> > users
> > > can use the cached table object returned by the .cache() method in
> other
> > > job and it should read the intermediate result. The intermediate result
> > can
> > > gone in the following three cases: 1. the user explicitly call the
> > > invalidateCache() method 2. the TableEnvironment is closed 3. failure
> > > happens on the TM. When that happens, the intermeidate result will not
> be
> > > available unless it is re-generated.
> > >
> > > 3. In the "semantic of cache() method" section, the description "The
> > > > semantic of the *cache() *method is a little different depending on
> > > whether
> > > > auto caching is enabled or not." seems not explained.
> > > >
> > >
> > > This line is actually outdated and should be removed, as we are not
> > adding
> > > the auto caching functionality in this FLIP. Auto caching will be added
> > in
> > > the future, and the semantic of cache() when auto caching is enabled
> will
> > > be discussed in detail by a new FLIP. I will remove the descriptor to
> > avoid
> > > further confusion.
> > >
> > >
> > > > Thanks,
> > > >
> > > > Jiangjie (Becket) Qin
> > > >
> > > >
> > > >
> > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <[hidden email]>
> > > wrote:
> > > >
> > > > > Hi folks,
> > > > >
> > > > > I'd like to start the discussion about FLIP-36 Support Interactive
> > > > > Programming in Flink Table API
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > >
> > > > > The FLIP proposes to add support for interactive programming in
> Flink
> > > > Table
> > > > > API. Specifically, it let users cache the intermediate
> > results(tables)
> > > > and
> > > > > use them in the later jobs.
> > > > >
> > > > > Even though the FLIP has been discussed in the past[1], the FLIP
> > hasn't
> > > > > formally passed the vote yet. And some of the design and
> > implementation
> > > > > detail have to change to incorporates the cluster partition
> proposed
> > in
> > > > > FLIP-67[2].
> > > > >
> > > > > Looking forward to your feedback.
> > > > >
> > > > > Thanks,
> > > > > Xuannan
> > > > >
> > > > > [1]
> > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > > > [2]
> > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi Becket,

You are right. It makes sense to treat retry of job 2 as an ordinary job.
And the config does introduce some unnecessary confusion. Thank you for you
comment. I will update the FLIP.

Best,
Xuannan

On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <[hidden email]> wrote:

> Hi Xuannan,
>
> If user submits Job 1 and generated a cached intermediate result. And later
> on, user submitted job 2 which should ideally use the intermediate result.
> In that case, if job 2 failed due to missing the intermediate result, Job 2
> should be retried with its full DAG. After that when Job 2 runs, it will
> also re-generate the cache. However, once job 2 has fell back to the
> original DAG, should it just be treated as an ordinary job that follow the
> recovery strategy? Having a separate configuration seems a little
> confusing. In another word, re-generating the cache is just a byproduct of
> running the full DAG of job 2, but is not the main purpose. It is just like
> when job 1 runs to generate cache, it does not have a separate config of
> retry to make sure the cache is generated. If it fails, it just fail like
> an ordinary job.
>
> What do you think?
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <[hidden email]> wrote:
>
> > Hi Becket,
> >
> > The intermediate result will indeed be automatically re-generated by
> > resubmitting the original DAG. And that job could fail as well. In that
> > case, we need to decide if we should resubmit the original DAG to
> > re-generate the intermediate result or give up and throw an exception to
> > the user. And the config is to indicate how many resubmit should happen
> > before giving up.
> >
> > Thanks,
> > Xuannan
> >
> > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <[hidden email]> wrote:
> >
> > > Hi Xuannan,
> > >
> > >  I am not entirely sure if I understand the cases you mentioned. The
> > users
> > > > can use the cached table object returned by the .cache() method in
> > other
> > > > job and it should read the intermediate result. The intermediate
> result
> > > can
> > > > gone in the following three cases: 1. the user explicitly call the
> > > > invalidateCache() method 2. the TableEnvironment is closed 3. failure
> > > > happens on the TM. When that happens, the intermeidate result will
> not
> > be
> > > > available unless it is re-generated.
> > >
> > >
> > > What confused me was that why do we need to have a *cache.retries.max
> > > *config?
> > > Shouldn't the missing intermediate result always be automatically
> > > re-generated if it is gone?
> > >
> > > Thanks,
> > >
> > > Jiangjie (Becket) Qin
> > >
> > >
> > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <[hidden email]>
> > wrote:
> > >
> > > > Hi Becket,
> > > >
> > > > Thanks for the comments.
> > > >
> > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <[hidden email]>
> > wrote:
> > > >
> > > > > Hi Xuannan,
> > > > >
> > > > > Thanks for picking up the FLIP. It looks good to me overall. Some
> > quick
> > > > > comments / questions below:
> > > > >
> > > > > 1. Do we also need changes in the Java API?
> > > > >
> > > >
> > > > Yes, the public interface of Table and TableEnvironment should be
> made
> > in
> > > > the Java API.
> > > >
> > > >
> > > > > 2. What are the cases that users may want to retry reading the
> > > > intermediate
> > > > > result? It seems that once the intermediate result has gone, it
> will
> > > not
> > > > be
> > > > > available later without being generated again, right?
> > > > >
> > > >
> > > >  I am not entirely sure if I understand the cases you mentioned. The
> > > users
> > > > can use the cached table object returned by the .cache() method in
> > other
> > > > job and it should read the intermediate result. The intermediate
> result
> > > can
> > > > gone in the following three cases: 1. the user explicitly call the
> > > > invalidateCache() method 2. the TableEnvironment is closed 3. failure
> > > > happens on the TM. When that happens, the intermeidate result will
> not
> > be
> > > > available unless it is re-generated.
> > > >
> > > > 3. In the "semantic of cache() method" section, the description "The
> > > > > semantic of the *cache() *method is a little different depending on
> > > > whether
> > > > > auto caching is enabled or not." seems not explained.
> > > > >
> > > >
> > > > This line is actually outdated and should be removed, as we are not
> > > adding
> > > > the auto caching functionality in this FLIP. Auto caching will be
> added
> > > in
> > > > the future, and the semantic of cache() when auto caching is enabled
> > will
> > > > be discussed in detail by a new FLIP. I will remove the descriptor to
> > > avoid
> > > > further confusion.
> > > >
> > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > >
> > > > >
> > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <[hidden email]>
> > > > wrote:
> > > > >
> > > > > > Hi folks,
> > > > > >
> > > > > > I'd like to start the discussion about FLIP-36 Support
> Interactive
> > > > > > Programming in Flink Table API
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > >
> > > > > > The FLIP proposes to add support for interactive programming in
> > Flink
> > > > > Table
> > > > > > API. Specifically, it let users cache the intermediate
> > > results(tables)
> > > > > and
> > > > > > use them in the later jobs.
> > > > > >
> > > > > > Even though the FLIP has been discussed in the past[1], the FLIP
> > > hasn't
> > > > > > formally passed the vote yet. And some of the design and
> > > implementation
> > > > > > detail have to change to incorporates the cluster partition
> > proposed
> > > in
> > > > > > FLIP-67[2].
> > > > > >
> > > > > > Looking forward to your feedback.
> > > > > >
> > > > > > Thanks,
> > > > > > Xuannan
> > > > > >
> > > > > > [1]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > > > > [2]
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi folks,

The FLIP-36 is updated according to the discussion with Becket. In the
meantime, any comments are very welcome.

If there are no further comments, I would like to start the voting
thread by tomorrow.

Thanks,
Xuannan


On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]> wrote:

> Hi Becket,
>
> You are right. It makes sense to treat retry of job 2 as an ordinary job.
> And the config does introduce some unnecessary confusion. Thank you for you
> comment. I will update the FLIP.
>
> Best,
> Xuannan
>
> On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <[hidden email]> wrote:
>
>> Hi Xuannan,
>>
>> If user submits Job 1 and generated a cached intermediate result. And
>> later
>> on, user submitted job 2 which should ideally use the intermediate result.
>> In that case, if job 2 failed due to missing the intermediate result, Job
>> 2
>> should be retried with its full DAG. After that when Job 2 runs, it will
>> also re-generate the cache. However, once job 2 has fell back to the
>> original DAG, should it just be treated as an ordinary job that follow the
>> recovery strategy? Having a separate configuration seems a little
>> confusing. In another word, re-generating the cache is just a byproduct of
>> running the full DAG of job 2, but is not the main purpose. It is just
>> like
>> when job 1 runs to generate cache, it does not have a separate config of
>> retry to make sure the cache is generated. If it fails, it just fail like
>> an ordinary job.
>>
>> What do you think?
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>> On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <[hidden email]> wrote:
>>
>> > Hi Becket,
>> >
>> > The intermediate result will indeed be automatically re-generated by
>> > resubmitting the original DAG. And that job could fail as well. In that
>> > case, we need to decide if we should resubmit the original DAG to
>> > re-generate the intermediate result or give up and throw an exception to
>> > the user. And the config is to indicate how many resubmit should happen
>> > before giving up.
>> >
>> > Thanks,
>> > Xuannan
>> >
>> > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <[hidden email]>
>> wrote:
>> >
>> > > Hi Xuannan,
>> > >
>> > >  I am not entirely sure if I understand the cases you mentioned. The
>> > users
>> > > > can use the cached table object returned by the .cache() method in
>> > other
>> > > > job and it should read the intermediate result. The intermediate
>> result
>> > > can
>> > > > gone in the following three cases: 1. the user explicitly call the
>> > > > invalidateCache() method 2. the TableEnvironment is closed 3.
>> failure
>> > > > happens on the TM. When that happens, the intermeidate result will
>> not
>> > be
>> > > > available unless it is re-generated.
>> > >
>> > >
>> > > What confused me was that why do we need to have a *cache.retries.max
>> > > *config?
>> > > Shouldn't the missing intermediate result always be automatically
>> > > re-generated if it is gone?
>> > >
>> > > Thanks,
>> > >
>> > > Jiangjie (Becket) Qin
>> > >
>> > >
>> > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <[hidden email]>
>> > wrote:
>> > >
>> > > > Hi Becket,
>> > > >
>> > > > Thanks for the comments.
>> > > >
>> > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <[hidden email]>
>> > wrote:
>> > > >
>> > > > > Hi Xuannan,
>> > > > >
>> > > > > Thanks for picking up the FLIP. It looks good to me overall. Some
>> > quick
>> > > > > comments / questions below:
>> > > > >
>> > > > > 1. Do we also need changes in the Java API?
>> > > > >
>> > > >
>> > > > Yes, the public interface of Table and TableEnvironment should be
>> made
>> > in
>> > > > the Java API.
>> > > >
>> > > >
>> > > > > 2. What are the cases that users may want to retry reading the
>> > > > intermediate
>> > > > > result? It seems that once the intermediate result has gone, it
>> will
>> > > not
>> > > > be
>> > > > > available later without being generated again, right?
>> > > > >
>> > > >
>> > > >  I am not entirely sure if I understand the cases you mentioned. The
>> > > users
>> > > > can use the cached table object returned by the .cache() method in
>> > other
>> > > > job and it should read the intermediate result. The intermediate
>> result
>> > > can
>> > > > gone in the following three cases: 1. the user explicitly call the
>> > > > invalidateCache() method 2. the TableEnvironment is closed 3.
>> failure
>> > > > happens on the TM. When that happens, the intermeidate result will
>> not
>> > be
>> > > > available unless it is re-generated.
>> > > >
>> > > > 3. In the "semantic of cache() method" section, the description "The
>> > > > > semantic of the *cache() *method is a little different depending
>> on
>> > > > whether
>> > > > > auto caching is enabled or not." seems not explained.
>> > > > >
>> > > >
>> > > > This line is actually outdated and should be removed, as we are not
>> > > adding
>> > > > the auto caching functionality in this FLIP. Auto caching will be
>> added
>> > > in
>> > > > the future, and the semantic of cache() when auto caching is enabled
>> > will
>> > > > be discussed in detail by a new FLIP. I will remove the descriptor
>> to
>> > > avoid
>> > > > further confusion.
>> > > >
>> > > >
>> > > > > Thanks,
>> > > > >
>> > > > > Jiangjie (Becket) Qin
>> > > > >
>> > > > >
>> > > > >
>> > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <[hidden email]
>> >
>> > > > wrote:
>> > > > >
>> > > > > > Hi folks,
>> > > > > >
>> > > > > > I'd like to start the discussion about FLIP-36 Support
>> Interactive
>> > > > > > Programming in Flink Table API
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>> > > > > >
>> > > > > > The FLIP proposes to add support for interactive programming in
>> > Flink
>> > > > > Table
>> > > > > > API. Specifically, it let users cache the intermediate
>> > > results(tables)
>> > > > > and
>> > > > > > use them in the later jobs.
>> > > > > >
>> > > > > > Even though the FLIP has been discussed in the past[1], the FLIP
>> > > hasn't
>> > > > > > formally passed the vote yet. And some of the design and
>> > > implementation
>> > > > > > detail have to change to incorporates the cluster partition
>> > proposed
>> > > in
>> > > > > > FLIP-67[2].
>> > > > > >
>> > > > > > Looking forward to your feedback.
>> > > > > >
>> > > > > > Thanks,
>> > > > > > Xuannan
>> > > > > >
>> > > > > > [1]
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
>> > > > > > [2]
>> > > > > >
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
>> > > > > >
>> > > > >
>> > > >
>> > >
>> >
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi,

There are some feedbacks from @Timo and @Kurt in the voting thread for
FLIP-36 and I want to share my thoughts here.

1. How would the FLIP-36 look like after FLIP-84?
I don't think FLIP-84 will affect FLIP-36 from the public API perspective.
Users can call .cache on a table object and the cached table will be
generated whenever the table job is triggered to execute, either by
Table#executeInsert or StatementSet#execute. I think that FLIP-36 should
aware of the changes made by FLIP-84, but it shouldn't be a problem. At the
end of the day, FLIP-36 only requires the ability to add a sink to a node,
submit a table job with multiple sinks, and replace the cached table with a
source.

2. How can we support cache in a multi-statement SQL file?
The most intuitive way to support cache in a multi-statement SQL file is by
using a view, where the view is corresponding to a cached table.

3. Unifying the cached table and materialized views
It is true that the cached table and the materialized view are similar in
some way. However, I think the materialized view is a more complex concept.
First, a materialized view requires some kind of a refresh mechanism to
synchronize with the table. Secondly, the life cycle of a materialized view
is longer. The materialized view should be accessible even after the
application exits and should be accessible by another application, while
the cached table is only accessible in the application where it is created.
The cached table is introduced to avoid recomputation of an intermediate
table to support interactive programming in Flink Table API. And I think
the materialized view needs more discussion and certainly deserves a whole
new FLIP.

Please let me know your thought.

Best,
Xuannan

On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[hidden email]> wrote:

> Hi folks,
>
> The FLIP-36 is updated according to the discussion with Becket. In the
> meantime, any comments are very welcome.
>
> If there are no further comments, I would like to start the voting
> thread by tomorrow.
>
> Thanks,
> Xuannan
>
>
> On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]> wrote:
>
>> Hi Becket,
>>
>> You are right. It makes sense to treat retry of job 2 as an ordinary job.
>> And the config does introduce some unnecessary confusion. Thank you for you
>> comment. I will update the FLIP.
>>
>> Best,
>> Xuannan
>>
>> On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <[hidden email]> wrote:
>>
>>> Hi Xuannan,
>>>
>>> If user submits Job 1 and generated a cached intermediate result. And
>>> later
>>> on, user submitted job 2 which should ideally use the intermediate
>>> result.
>>> In that case, if job 2 failed due to missing the intermediate result,
>>> Job 2
>>> should be retried with its full DAG. After that when Job 2 runs, it will
>>> also re-generate the cache. However, once job 2 has fell back to the
>>> original DAG, should it just be treated as an ordinary job that follow
>>> the
>>> recovery strategy? Having a separate configuration seems a little
>>> confusing. In another word, re-generating the cache is just a byproduct
>>> of
>>> running the full DAG of job 2, but is not the main purpose. It is just
>>> like
>>> when job 1 runs to generate cache, it does not have a separate config of
>>> retry to make sure the cache is generated. If it fails, it just fail like
>>> an ordinary job.
>>>
>>> What do you think?
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <[hidden email]>
>>> wrote:
>>>
>>> > Hi Becket,
>>> >
>>> > The intermediate result will indeed be automatically re-generated by
>>> > resubmitting the original DAG. And that job could fail as well. In that
>>> > case, we need to decide if we should resubmit the original DAG to
>>> > re-generate the intermediate result or give up and throw an exception
>>> to
>>> > the user. And the config is to indicate how many resubmit should happen
>>> > before giving up.
>>> >
>>> > Thanks,
>>> > Xuannan
>>> >
>>> > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <[hidden email]>
>>> wrote:
>>> >
>>> > > Hi Xuannan,
>>> > >
>>> > >  I am not entirely sure if I understand the cases you mentioned. The
>>> > users
>>> > > > can use the cached table object returned by the .cache() method in
>>> > other
>>> > > > job and it should read the intermediate result. The intermediate
>>> result
>>> > > can
>>> > > > gone in the following three cases: 1. the user explicitly call the
>>> > > > invalidateCache() method 2. the TableEnvironment is closed 3.
>>> failure
>>> > > > happens on the TM. When that happens, the intermeidate result will
>>> not
>>> > be
>>> > > > available unless it is re-generated.
>>> > >
>>> > >
>>> > > What confused me was that why do we need to have a *cache.retries.max
>>> > > *config?
>>> > > Shouldn't the missing intermediate result always be automatically
>>> > > re-generated if it is gone?
>>> > >
>>> > > Thanks,
>>> > >
>>> > > Jiangjie (Becket) Qin
>>> > >
>>> > >
>>> > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <[hidden email]>
>>> > wrote:
>>> > >
>>> > > > Hi Becket,
>>> > > >
>>> > > > Thanks for the comments.
>>> > > >
>>> > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <[hidden email]>
>>> > wrote:
>>> > > >
>>> > > > > Hi Xuannan,
>>> > > > >
>>> > > > > Thanks for picking up the FLIP. It looks good to me overall. Some
>>> > quick
>>> > > > > comments / questions below:
>>> > > > >
>>> > > > > 1. Do we also need changes in the Java API?
>>> > > > >
>>> > > >
>>> > > > Yes, the public interface of Table and TableEnvironment should be
>>> made
>>> > in
>>> > > > the Java API.
>>> > > >
>>> > > >
>>> > > > > 2. What are the cases that users may want to retry reading the
>>> > > > intermediate
>>> > > > > result? It seems that once the intermediate result has gone, it
>>> will
>>> > > not
>>> > > > be
>>> > > > > available later without being generated again, right?
>>> > > > >
>>> > > >
>>> > > >  I am not entirely sure if I understand the cases you mentioned.
>>> The
>>> > > users
>>> > > > can use the cached table object returned by the .cache() method in
>>> > other
>>> > > > job and it should read the intermediate result. The intermediate
>>> result
>>> > > can
>>> > > > gone in the following three cases: 1. the user explicitly call the
>>> > > > invalidateCache() method 2. the TableEnvironment is closed 3.
>>> failure
>>> > > > happens on the TM. When that happens, the intermeidate result will
>>> not
>>> > be
>>> > > > available unless it is re-generated.
>>> > > >
>>> > > > 3. In the "semantic of cache() method" section, the description
>>> "The
>>> > > > > semantic of the *cache() *method is a little different depending
>>> on
>>> > > > whether
>>> > > > > auto caching is enabled or not." seems not explained.
>>> > > > >
>>> > > >
>>> > > > This line is actually outdated and should be removed, as we are not
>>> > > adding
>>> > > > the auto caching functionality in this FLIP. Auto caching will be
>>> added
>>> > > in
>>> > > > the future, and the semantic of cache() when auto caching is
>>> enabled
>>> > will
>>> > > > be discussed in detail by a new FLIP. I will remove the descriptor
>>> to
>>> > > avoid
>>> > > > further confusion.
>>> > > >
>>> > > >
>>> > > > > Thanks,
>>> > > > >
>>> > > > > Jiangjie (Becket) Qin
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
>>> [hidden email]>
>>> > > > wrote:
>>> > > > >
>>> > > > > > Hi folks,
>>> > > > > >
>>> > > > > > I'd like to start the discussion about FLIP-36 Support
>>> Interactive
>>> > > > > > Programming in Flink Table API
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>>> > > > > >
>>> > > > > > The FLIP proposes to add support for interactive programming in
>>> > Flink
>>> > > > > Table
>>> > > > > > API. Specifically, it let users cache the intermediate
>>> > > results(tables)
>>> > > > > and
>>> > > > > > use them in the later jobs.
>>> > > > > >
>>> > > > > > Even though the FLIP has been discussed in the past[1], the
>>> FLIP
>>> > > hasn't
>>> > > > > > formally passed the vote yet. And some of the design and
>>> > > implementation
>>> > > > > > detail have to change to incorporates the cluster partition
>>> > proposed
>>> > > in
>>> > > > > > FLIP-67[2].
>>> > > > > >
>>> > > > > > Looking forward to your feedback.
>>> > > > > >
>>> > > > > > Thanks,
>>> > > > > > Xuannan
>>> > > > > >
>>> > > > > > [1]
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
>>> > > > > > [2]
>>> > > > > >
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
>>> > > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi folks,

As the feature freeze of Flink 1.11 has passed and the release branch is
cut, I'd like to revive this discussion thread of FLIP-36[1]. A quick
summary of FLIP-36:

The FLIP proposes to add support for interactive programming in Flink Table
API. Specifically, it let users cache the intermediate results(tables) and
use them in later jobs.

Any comments or suggestions are very appreciated. I'd like to know how the
community thinks about the FLIP. In the meantime, I plan to revive the vote
thread if no comments are received in 2 days.

Best,
Xuannan

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle


On Thu, May 7, 2020 at 5:40 PM Xuannan Su <[hidden email]> wrote:

> Hi,
>
> There are some feedbacks from @Timo and @Kurt in the voting thread for
> FLIP-36 and I want to share my thoughts here.
>
> 1. How would the FLIP-36 look like after FLIP-84?
> I don't think FLIP-84 will affect FLIP-36 from the public API perspective.
> Users can call .cache on a table object and the cached table will be
> generated whenever the table job is triggered to execute, either by
> Table#executeInsert or StatementSet#execute. I think that FLIP-36 should
> aware of the changes made by FLIP-84, but it shouldn't be a problem. At the
> end of the day, FLIP-36 only requires the ability to add a sink to a node,
> submit a table job with multiple sinks, and replace the cached table with a
> source.
>
> 2. How can we support cache in a multi-statement SQL file?
> The most intuitive way to support cache in a multi-statement SQL file is
> by using a view, where the view is corresponding to a cached table.
>
> 3. Unifying the cached table and materialized views
> It is true that the cached table and the materialized view are similar in
> some way. However, I think the materialized view is a more complex concept.
> First, a materialized view requires some kind of a refresh mechanism to
> synchronize with the table. Secondly, the life cycle of a materialized view
> is longer. The materialized view should be accessible even after the
> application exits and should be accessible by another application, while
> the cached table is only accessible in the application where it is created.
> The cached table is introduced to avoid recomputation of an intermediate
> table to support interactive programming in Flink Table API. And I think
> the materialized view needs more discussion and certainly deserves a whole
> new FLIP.
>
> Please let me know your thought.
>
> Best,
> Xuannan
>
> On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[hidden email]> wrote:
>
>> Hi folks,
>>
>> The FLIP-36 is updated according to the discussion with Becket. In the
>> meantime, any comments are very welcome.
>>
>> If there are no further comments, I would like to start the voting
>> thread by tomorrow.
>>
>> Thanks,
>> Xuannan
>>
>>
>> On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]> wrote:
>>
>>> Hi Becket,
>>>
>>> You are right. It makes sense to treat retry of job 2 as an ordinary
>>> job. And the config does introduce some unnecessary confusion. Thank you
>>> for you comment. I will update the FLIP.
>>>
>>> Best,
>>> Xuannan
>>>
>>> On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <[hidden email]> wrote:
>>>
>>>> Hi Xuannan,
>>>>
>>>> If user submits Job 1 and generated a cached intermediate result. And
>>>> later
>>>> on, user submitted job 2 which should ideally use the intermediate
>>>> result.
>>>> In that case, if job 2 failed due to missing the intermediate result,
>>>> Job 2
>>>> should be retried with its full DAG. After that when Job 2 runs, it will
>>>> also re-generate the cache. However, once job 2 has fell back to the
>>>> original DAG, should it just be treated as an ordinary job that follow
>>>> the
>>>> recovery strategy? Having a separate configuration seems a little
>>>> confusing. In another word, re-generating the cache is just a byproduct
>>>> of
>>>> running the full DAG of job 2, but is not the main purpose. It is just
>>>> like
>>>> when job 1 runs to generate cache, it does not have a separate config of
>>>> retry to make sure the cache is generated. If it fails, it just fail
>>>> like
>>>> an ordinary job.
>>>>
>>>> What do you think?
>>>>
>>>> Thanks,
>>>>
>>>> Jiangjie (Becket) Qin
>>>>
>>>> On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <[hidden email]>
>>>> wrote:
>>>>
>>>> > Hi Becket,
>>>> >
>>>> > The intermediate result will indeed be automatically re-generated by
>>>> > resubmitting the original DAG. And that job could fail as well. In
>>>> that
>>>> > case, we need to decide if we should resubmit the original DAG to
>>>> > re-generate the intermediate result or give up and throw an exception
>>>> to
>>>> > the user. And the config is to indicate how many resubmit should
>>>> happen
>>>> > before giving up.
>>>> >
>>>> > Thanks,
>>>> > Xuannan
>>>> >
>>>> > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <[hidden email]>
>>>> wrote:
>>>> >
>>>> > > Hi Xuannan,
>>>> > >
>>>> > >  I am not entirely sure if I understand the cases you mentioned. The
>>>> > users
>>>> > > > can use the cached table object returned by the .cache() method in
>>>> > other
>>>> > > > job and it should read the intermediate result. The intermediate
>>>> result
>>>> > > can
>>>> > > > gone in the following three cases: 1. the user explicitly call the
>>>> > > > invalidateCache() method 2. the TableEnvironment is closed 3.
>>>> failure
>>>> > > > happens on the TM. When that happens, the intermeidate result
>>>> will not
>>>> > be
>>>> > > > available unless it is re-generated.
>>>> > >
>>>> > >
>>>> > > What confused me was that why do we need to have a
>>>> *cache.retries.max
>>>> > > *config?
>>>> > > Shouldn't the missing intermediate result always be automatically
>>>> > > re-generated if it is gone?
>>>> > >
>>>> > > Thanks,
>>>> > >
>>>> > > Jiangjie (Becket) Qin
>>>> > >
>>>> > >
>>>> > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <[hidden email]>
>>>> > wrote:
>>>> > >
>>>> > > > Hi Becket,
>>>> > > >
>>>> > > > Thanks for the comments.
>>>> > > >
>>>> > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <[hidden email]>
>>>> > wrote:
>>>> > > >
>>>> > > > > Hi Xuannan,
>>>> > > > >
>>>> > > > > Thanks for picking up the FLIP. It looks good to me overall.
>>>> Some
>>>> > quick
>>>> > > > > comments / questions below:
>>>> > > > >
>>>> > > > > 1. Do we also need changes in the Java API?
>>>> > > > >
>>>> > > >
>>>> > > > Yes, the public interface of Table and TableEnvironment should be
>>>> made
>>>> > in
>>>> > > > the Java API.
>>>> > > >
>>>> > > >
>>>> > > > > 2. What are the cases that users may want to retry reading the
>>>> > > > intermediate
>>>> > > > > result? It seems that once the intermediate result has gone, it
>>>> will
>>>> > > not
>>>> > > > be
>>>> > > > > available later without being generated again, right?
>>>> > > > >
>>>> > > >
>>>> > > >  I am not entirely sure if I understand the cases you mentioned.
>>>> The
>>>> > > users
>>>> > > > can use the cached table object returned by the .cache() method in
>>>> > other
>>>> > > > job and it should read the intermediate result. The intermediate
>>>> result
>>>> > > can
>>>> > > > gone in the following three cases: 1. the user explicitly call the
>>>> > > > invalidateCache() method 2. the TableEnvironment is closed 3.
>>>> failure
>>>> > > > happens on the TM. When that happens, the intermeidate result
>>>> will not
>>>> > be
>>>> > > > available unless it is re-generated.
>>>> > > >
>>>> > > > 3. In the "semantic of cache() method" section, the description
>>>> "The
>>>> > > > > semantic of the *cache() *method is a little different
>>>> depending on
>>>> > > > whether
>>>> > > > > auto caching is enabled or not." seems not explained.
>>>> > > > >
>>>> > > >
>>>> > > > This line is actually outdated and should be removed, as we are
>>>> not
>>>> > > adding
>>>> > > > the auto caching functionality in this FLIP. Auto caching will be
>>>> added
>>>> > > in
>>>> > > > the future, and the semantic of cache() when auto caching is
>>>> enabled
>>>> > will
>>>> > > > be discussed in detail by a new FLIP. I will remove the
>>>> descriptor to
>>>> > > avoid
>>>> > > > further confusion.
>>>> > > >
>>>> > > >
>>>> > > > > Thanks,
>>>> > > > >
>>>> > > > > Jiangjie (Becket) Qin
>>>> > > > >
>>>> > > > >
>>>> > > > >
>>>> > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
>>>> [hidden email]>
>>>> > > > wrote:
>>>> > > > >
>>>> > > > > > Hi folks,
>>>> > > > > >
>>>> > > > > > I'd like to start the discussion about FLIP-36 Support
>>>> Interactive
>>>> > > > > > Programming in Flink Table API
>>>> > > > > >
>>>> > > > > >
>>>> > > > >
>>>> > > >
>>>> > >
>>>> >
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>>>> > > > > >
>>>> > > > > > The FLIP proposes to add support for interactive programming
>>>> in
>>>> > Flink
>>>> > > > > Table
>>>> > > > > > API. Specifically, it let users cache the intermediate
>>>> > > results(tables)
>>>> > > > > and
>>>> > > > > > use them in the later jobs.
>>>> > > > > >
>>>> > > > > > Even though the FLIP has been discussed in the past[1], the
>>>> FLIP
>>>> > > hasn't
>>>> > > > > > formally passed the vote yet. And some of the design and
>>>> > > implementation
>>>> > > > > > detail have to change to incorporates the cluster partition
>>>> > proposed
>>>> > > in
>>>> > > > > > FLIP-67[2].
>>>> > > > > >
>>>> > > > > > Looking forward to your feedback.
>>>> > > > > >
>>>> > > > > > Thanks,
>>>> > > > > > Xuannan
>>>> > > > > >
>>>> > > > > > [1]
>>>> > > > > >
>>>> > > > > >
>>>> > > > >
>>>> > > >
>>>> > >
>>>> >
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
>>>> > > > > > [2]
>>>> > > > > >
>>>> > > > > >
>>>> > > > >
>>>> > > >
>>>> > >
>>>> >
>>>> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
>>>> > > > > >
>>>> > > > >
>>>> > > >
>>>> > >
>>>> >
>>>>
>>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
In reply to this post by Xuannan Su
Hi folks,

I'd like to revive the discussion about FLIP-36 Support Interactive Programming in Flink Table API
https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink

The FLIP proposes to add support for interactive programming in Flink Table API. Specifically, it let users cache the intermediate results(tables) and use them in the later jobs to avoid recomputing the intermediate result(tables).

I am looking forward to any opinions and suggestions from the community.

Best,
Xuannan
On May 7, 2020, 5:40 PM +0800, Xuannan Su <[hidden email]>, wrote:

> Hi,
>
> There are some feedbacks from @Timo and @Kurt in the voting thread for FLIP-36 and I want to share my thoughts here.
>
> 1. How would the FLIP-36 look like after FLIP-84?
> I don't think FLIP-84 will affect FLIP-36 from the public API perspective. Users can call .cache on a table object and the cached table will be generated whenever the table job is triggered to execute, either by Table#executeInsert or StatementSet#execute. I think that FLIP-36 should aware of the changes made by FLIP-84, but it shouldn't be a problem. At the end of the day, FLIP-36 only requires the ability to add a sink to a node, submit a table job with multiple sinks, and replace the cached table with a source.
>
> 2. How can we support cache in a multi-statement SQL file?
> The most intuitive way to support cache in a multi-statement SQL file is by using a view, where the view is corresponding to a cached table.
>
> 3. Unifying the cached table and materialized views
> It is true that the cached table and the materialized view are similar in some way. However, I think the materialized view is a more complex concept. First, a materialized view requires some kind of a refresh mechanism to synchronize with the table. Secondly, the life cycle of a materialized view is longer. The materialized view should be accessible even after the application exits and should be accessible by another application, while the cached table is only accessible in the application where it is created. The cached table is introduced to avoid recomputation of an intermediate table to support interactive programming in Flink Table API. And I think the materialized view needs more discussion and certainly deserves a whole new FLIP.
>
> Please let me know your thought.
>
> Best,
> Xuannan
>
On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[hidden email]> wrote:

> Hi folks,
>
> The FLIP-36 is updated according to the discussion with Becket. In the meantime, any comments are very welcome.
>
> If there are no further comments, I would like to start the voting
> thread by tomorrow.
>
> Thanks,
> Xuannan
>
>
> > On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]> wrote:
> > > Hi Becket,
> > >
> > > You are right. It makes sense to treat retry of job 2 as an ordinary job. And the config does introduce some unnecessary confusion. Thank you for you comment. I will update the FLIP.
> > >
> > > Best,
> > > Xuannan
> > >
> > > > On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <[hidden email]> wrote:
> > > > > Hi Xuannan,
> > > > >
> > > > > If user submits Job 1 and generated a cached intermediate result. And later
> > > > > on, user submitted job 2 which should ideally use the intermediate result.
> > > > > In that case, if job 2 failed due to missing the intermediate result, Job 2
> > > > > should be retried with its full DAG. After that when Job 2 runs, it will
> > > > > also re-generate the cache. However, once job 2 has fell back to the
> > > > > original DAG, should it just be treated as an ordinary job that follow the
> > > > > recovery strategy? Having a separate configuration seems a little
> > > > > confusing. In another word, re-generating the cache is just a byproduct of
> > > > > running the full DAG of job 2, but is not the main purpose. It is just like
> > > > > when job 1 runs to generate cache, it does not have a separate config of
> > > > > retry to make sure the cache is generated. If it fails, it just fail like
> > > > > an ordinary job.
> > > > >
> > > > > What do you think?
> > > > >
> > > > > Thanks,
> > > > >
> > > > > Jiangjie (Becket) Qin
> > > > >
> > > > > On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <[hidden email]> wrote:
> > > > >
> > > > > > Hi Becket,
> > > > > >
> > > > > > The intermediate result will indeed be automatically re-generated by
> > > > > > resubmitting the original DAG. And that job could fail as well. In that
> > > > > > case, we need to decide if we should resubmit the original DAG to
> > > > > > re-generate the intermediate result or give up and throw an exception to
> > > > > > the user. And the config is to indicate how many resubmit should happen
> > > > > > before giving up.
> > > > > >
> > > > > > Thanks,
> > > > > > Xuannan
> > > > > >
> > > > > > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <[hidden email]> wrote:
> > > > > >
> > > > > > > Hi Xuannan,
> > > > > > >
> > > > > > >  I am not entirely sure if I understand the cases you mentioned. The
> > > > > > users
> > > > > > > > can use the cached table object returned by the .cache() method in
> > > > > > other
> > > > > > > > job and it should read the intermediate result. The intermediate result
> > > > > > > can
> > > > > > > > gone in the following three cases: 1. the user explicitly call the
> > > > > > > > invalidateCache() method 2. the TableEnvironment is closed 3. failure
> > > > > > > > happens on the TM. When that happens, the intermeidate result will not
> > > > > > be
> > > > > > > > available unless it is re-generated.
> > > > > > >
> > > > > > >
> > > > > > > What confused me was that why do we need to have a *cache.retries.max
> > > > > > > *config?
> > > > > > > Shouldn't the missing intermediate result always be automatically
> > > > > > > re-generated if it is gone?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > >
> > > > > > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <[hidden email]>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Hi Becket,
> > > > > > > >
> > > > > > > > Thanks for the comments.
> > > > > > > >
> > > > > > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <[hidden email]>
> > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Xuannan,
> > > > > > > > >
> > > > > > > > > Thanks for picking up the FLIP. It looks good to me overall. Some
> > > > > > quick
> > > > > > > > > comments / questions below:
> > > > > > > > >
> > > > > > > > > 1. Do we also need changes in the Java API?
> > > > > > > > >
> > > > > > > >
> > > > > > > > Yes, the public interface of Table and TableEnvironment should be made
> > > > > > in
> > > > > > > > the Java API.
> > > > > > > >
> > > > > > > >
> > > > > > > > > 2. What are the cases that users may want to retry reading the
> > > > > > > > intermediate
> > > > > > > > > result? It seems that once the intermediate result has gone, it will
> > > > > > > not
> > > > > > > > be
> > > > > > > > > available later without being generated again, right?
> > > > > > > > >
> > > > > > > >
> > > > > > > >  I am not entirely sure if I understand the cases you mentioned. The
> > > > > > > users
> > > > > > > > can use the cached table object returned by the .cache() method in
> > > > > > other
> > > > > > > > job and it should read the intermediate result. The intermediate result
> > > > > > > can
> > > > > > > > gone in the following three cases: 1. the user explicitly call the
> > > > > > > > invalidateCache() method 2. the TableEnvironment is closed 3. failure
> > > > > > > > happens on the TM. When that happens, the intermeidate result will not
> > > > > > be
> > > > > > > > available unless it is re-generated.
> > > > > > > >
> > > > > > > > 3. In the "semantic of cache() method" section, the description "The
> > > > > > > > > semantic of the *cache() *method is a little different depending on
> > > > > > > > whether
> > > > > > > > > auto caching is enabled or not." seems not explained.
> > > > > > > > >
> > > > > > > >
> > > > > > > > This line is actually outdated and should be removed, as we are not
> > > > > > > adding
> > > > > > > > the auto caching functionality in this FLIP. Auto caching will be added
> > > > > > > in
> > > > > > > > the future, and the semantic of cache() when auto caching is enabled
> > > > > > will
> > > > > > > > be discussed in detail by a new FLIP. I will remove the descriptor to
> > > > > > > avoid
> > > > > > > > further confusion.
> > > > > > > >
> > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > >
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <[hidden email]>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi folks,
> > > > > > > > > >
> > > > > > > > > > I'd like to start the discussion about FLIP-36 Support Interactive
> > > > > > > > > > Programming in Flink Table API
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > > > >
> > > > > > > > > > The FLIP proposes to add support for interactive programming in
> > > > > > Flink
> > > > > > > > > Table
> > > > > > > > > > API. Specifically, it let users cache the intermediate
> > > > > > > results(tables)
> > > > > > > > > and
> > > > > > > > > > use them in the later jobs.
> > > > > > > > > >
> > > > > > > > > > Even though the FLIP has been discussed in the past[1], the FLIP
> > > > > > > hasn't
> > > > > > > > > > formally passed the vote yet. And some of the design and
> > > > > > > implementation
> > > > > > > > > > detail have to change to incorporates the cluster partition
> > > > > > proposed
> > > > > > > in
> > > > > > > > > > FLIP-67[2].
> > > > > > > > > >
> > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Xuannan
> > > > > > > > > >
> > > > > > > > > > [1]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > > > > > > > > [2]
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > > https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Kurt Young
Hi Xuanna,

Thanks for the detailed design doc, it described clearly how the API looks
and how to interact with Flink runtime.
However, the part which relates to SQL's optimizer is kind of blurry. To be
more precise, I have following questions:

1. How do you identify the CachedTable? I can imagine there would be map
representing the cache, how do you
compare the keys of the map? One approach is they will be compared by java
objects, which is simple but has
limited scope. For example, users created another table using some
interfaces of TableEnvironment, and the table
is exactly the same as the cached one, you won't be able to identify it.
Another choice is calculating the "signature" or
"diest" of the cached table, which involves string representation of the
whole sub tree represented by the cached table.
I don't think Flink currently provides such a mechanism around Table
though.

2. How does the CachedTable affect the optimizer? Specifically, will you
have a dedicated QueryOperation for it, will you have
a dedicated logical & physical RelNode for it? And I also don't see a
description about how to work with current optimize phases,
from Operation to Calcite rel node, and then to Flink's logical and
physical node, which will be at last translated to Flink's exec node.
There also exists other optimizations such as dead lock breaker, as well as
sub plan reuse inside the optimizer, I'm not sure whether
the logic dealing with cached tables can be orthogonal to all of these.
Hence I expect you could have a more detailed description here.

3. What's the effect of calling TableEnvironment.close()? You already
explained this would drop all caches this table env has,
could you also explain where other functionality still works for this table
env? Like can use still create/drop tables/databases/function
through this table env? What happens to the catalog and all temporary
objects of this table env?

One minor comment: I noticed you used some not existing API in the examples
you gave, like table.collect(), which is a little
misleading.

Best,
Kurt


On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <[hidden email]> wrote:

> Hi folks,
>
> I'd like to revive the discussion about FLIP-36 Support Interactive
> Programming in Flink Table API
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>
> The FLIP proposes to add support for interactive programming in Flink
> Table API. Specifically, it let users cache the intermediate
> results(tables) and use them in the later jobs to avoid recomputing the
> intermediate result(tables).
>
> I am looking forward to any opinions and suggestions from the community.
>
> Best,
> Xuannan
> On May 7, 2020, 5:40 PM +0800, Xuannan Su <[hidden email]>, wrote:
> > Hi,
> >
> > There are some feedbacks from @Timo and @Kurt in the voting thread for
> FLIP-36 and I want to share my thoughts here.
> >
> > 1. How would the FLIP-36 look like after FLIP-84?
> > I don't think FLIP-84 will affect FLIP-36 from the public API
> perspective. Users can call .cache on a table object and the cached table
> will be generated whenever the table job is triggered to execute, either by
> Table#executeInsert or StatementSet#execute. I think that FLIP-36 should
> aware of the changes made by FLIP-84, but it shouldn't be a problem. At the
> end of the day, FLIP-36 only requires the ability to add a sink to a node,
> submit a table job with multiple sinks, and replace the cached table with a
> source.
> >
> > 2. How can we support cache in a multi-statement SQL file?
> > The most intuitive way to support cache in a multi-statement SQL file is
> by using a view, where the view is corresponding to a cached table.
> >
> > 3. Unifying the cached table and materialized views
> > It is true that the cached table and the materialized view are similar
> in some way. However, I think the materialized view is a more complex
> concept. First, a materialized view requires some kind of a refresh
> mechanism to synchronize with the table. Secondly, the life cycle of a
> materialized view is longer. The materialized view should be accessible
> even after the application exits and should be accessible by another
> application, while the cached table is only accessible in the application
> where it is created. The cached table is introduced to avoid recomputation
> of an intermediate table to support interactive programming in Flink Table
> API. And I think the materialized view needs more discussion and certainly
> deserves a whole new FLIP.
> >
> > Please let me know your thought.
> >
> > Best,
> > Xuannan
> >
> On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[hidden email]> wrote:
> > Hi folks,
> >
> > The FLIP-36 is updated according to the discussion with Becket. In the
> meantime, any comments are very welcome.
> >
> > If there are no further comments, I would like to start the voting
> > thread by tomorrow.
> >
> > Thanks,
> > Xuannan
> >
> >
> > > On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]>
> wrote:
> > > > Hi Becket,
> > > >
> > > > You are right. It makes sense to treat retry of job 2 as an ordinary
> job. And the config does introduce some unnecessary confusion. Thank you
> for you comment. I will update the FLIP.
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > > > > On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <[hidden email]>
> wrote:
> > > > > > Hi Xuannan,
> > > > > >
> > > > > > If user submits Job 1 and generated a cached intermediate
> result. And later
> > > > > > on, user submitted job 2 which should ideally use the
> intermediate result.
> > > > > > In that case, if job 2 failed due to missing the intermediate
> result, Job 2
> > > > > > should be retried with its full DAG. After that when Job 2 runs,
> it will
> > > > > > also re-generate the cache. However, once job 2 has fell back to
> the
> > > > > > original DAG, should it just be treated as an ordinary job that
> follow the
> > > > > > recovery strategy? Having a separate configuration seems a little
> > > > > > confusing. In another word, re-generating the cache is just a
> byproduct of
> > > > > > running the full DAG of job 2, but is not the main purpose. It
> is just like
> > > > > > when job 1 runs to generate cache, it does not have a separate
> config of
> > > > > > retry to make sure the cache is generated. If it fails, it just
> fail like
> > > > > > an ordinary job.
> > > > > >
> > > > > > What do you think?
> > > > > >
> > > > > > Thanks,
> > > > > >
> > > > > > Jiangjie (Becket) Qin
> > > > > >
> > > > > > On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
> [hidden email]> wrote:
> > > > > >
> > > > > > > Hi Becket,
> > > > > > >
> > > > > > > The intermediate result will indeed be automatically
> re-generated by
> > > > > > > resubmitting the original DAG. And that job could fail as
> well. In that
> > > > > > > case, we need to decide if we should resubmit the original DAG
> to
> > > > > > > re-generate the intermediate result or give up and throw an
> exception to
> > > > > > > the user. And the config is to indicate how many resubmit
> should happen
> > > > > > > before giving up.
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Xuannan
> > > > > > >
> > > > > > > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
> [hidden email]> wrote:
> > > > > > >
> > > > > > > > Hi Xuannan,
> > > > > > > >
> > > > > > > >  I am not entirely sure if I understand the cases you
> mentioned. The
> > > > > > > users
> > > > > > > > > can use the cached table object returned by the .cache()
> method in
> > > > > > > other
> > > > > > > > > job and it should read the intermediate result. The
> intermediate result
> > > > > > > > can
> > > > > > > > > gone in the following three cases: 1. the user explicitly
> call the
> > > > > > > > > invalidateCache() method 2. the TableEnvironment is closed
> 3. failure
> > > > > > > > > happens on the TM. When that happens, the intermeidate
> result will not
> > > > > > > be
> > > > > > > > > available unless it is re-generated.
> > > > > > > >
> > > > > > > >
> > > > > > > > What confused me was that why do we need to have a
> *cache.retries.max
> > > > > > > > *config?
> > > > > > > > Shouldn't the missing intermediate result always be
> automatically
> > > > > > > > re-generated if it is gone?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jiangjie (Becket) Qin
> > > > > > > >
> > > > > > > >
> > > > > > > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
> [hidden email]>
> > > > > > > wrote:
> > > > > > > >
> > > > > > > > > Hi Becket,
> > > > > > > > >
> > > > > > > > > Thanks for the comments.
> > > > > > > > >
> > > > > > > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
> [hidden email]>
> > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Xuannan,
> > > > > > > > > >
> > > > > > > > > > Thanks for picking up the FLIP. It looks good to me
> overall. Some
> > > > > > > quick
> > > > > > > > > > comments / questions below:
> > > > > > > > > >
> > > > > > > > > > 1. Do we also need changes in the Java API?
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > Yes, the public interface of Table and TableEnvironment
> should be made
> > > > > > > in
> > > > > > > > > the Java API.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > 2. What are the cases that users may want to retry
> reading the
> > > > > > > > > intermediate
> > > > > > > > > > result? It seems that once the intermediate result has
> gone, it will
> > > > > > > > not
> > > > > > > > > be
> > > > > > > > > > available later without being generated again, right?
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > >  I am not entirely sure if I understand the cases you
> mentioned. The
> > > > > > > > users
> > > > > > > > > can use the cached table object returned by the .cache()
> method in
> > > > > > > other
> > > > > > > > > job and it should read the intermediate result. The
> intermediate result
> > > > > > > > can
> > > > > > > > > gone in the following three cases: 1. the user explicitly
> call the
> > > > > > > > > invalidateCache() method 2. the TableEnvironment is closed
> 3. failure
> > > > > > > > > happens on the TM. When that happens, the intermeidate
> result will not
> > > > > > > be
> > > > > > > > > available unless it is re-generated.
> > > > > > > > >
> > > > > > > > > 3. In the "semantic of cache() method" section, the
> description "The
> > > > > > > > > > semantic of the *cache() *method is a little different
> depending on
> > > > > > > > > whether
> > > > > > > > > > auto caching is enabled or not." seems not explained.
> > > > > > > > > >
> > > > > > > > >
> > > > > > > > > This line is actually outdated and should be removed, as
> we are not
> > > > > > > > adding
> > > > > > > > > the auto caching functionality in this FLIP. Auto caching
> will be added
> > > > > > > > in
> > > > > > > > > the future, and the semantic of cache() when auto caching
> is enabled
> > > > > > > will
> > > > > > > > > be discussed in detail by a new FLIP. I will remove the
> descriptor to
> > > > > > > > avoid
> > > > > > > > > further confusion.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
> [hidden email]>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi folks,
> > > > > > > > > > >
> > > > > > > > > > > I'd like to start the discussion about FLIP-36 Support
> Interactive
> > > > > > > > > > > Programming in Flink Table API
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > > > > >
> > > > > > > > > > > The FLIP proposes to add support for interactive
> programming in
> > > > > > > Flink
> > > > > > > > > > Table
> > > > > > > > > > > API. Specifically, it let users cache the intermediate
> > > > > > > > results(tables)
> > > > > > > > > > and
> > > > > > > > > > > use them in the later jobs.
> > > > > > > > > > >
> > > > > > > > > > > Even though the FLIP has been discussed in the
> past[1], the FLIP
> > > > > > > > hasn't
> > > > > > > > > > > formally passed the vote yet. And some of the design
> and
> > > > > > > > implementation
> > > > > > > > > > > detail have to change to incorporates the cluster
> partition
> > > > > > > proposed
> > > > > > > > in
> > > > > > > > > > > FLIP-67[2].
> > > > > > > > > > >
> > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Xuannan
> > > > > > > > > > >
> > > > > > > > > > > [1]
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > > > > > > > > > [2]
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > > > > > >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi Kurt,

Thanks for the comments.

1. How do you identify the CachedTable?
For the current design proposed in FLIP-36, we are using the first approach you mentioned, where the key of the map is the Cached Table java object. I think it is fine not to be able to identify another table representing the same DAG and not using the cached intermediate result because we want to make the caching table explicit. As mentioned in the FLIP, the cache API will return a Table object. And the user has to use the returned Table object to make use of the cached table. The rationale is that if the user builds the same DAG from scratch with some TableEnvironment instead of using the cached table object, the user probably doesn't want to use the cache.

2. How does the CachedTable affect the optimizer?
We try to make the logic dealing with the cached table be as orthogonal to the optimizer as possible. That's why we introduce a special sink when we are going to cache a table and a special source when we are going to use a cached table. This way, we can let the optimizer does it works, and the logic of modifying the job graph can happen in the job graph generator. We can recognize the cached node with the special sink and source.

3. What's the effect of calling TableEnvironment.close()?
We introduce the close method to prevent leaking of the cached table when the user is done with the table environment. Therefore, it makes more sense that the table environment, including all of its functionality, should not be used after closing. Otherwise, we should rename the close method to clearAllCache or something similar.

And thanks for pointing out the use of not existing API used in the given examples. I have updated the examples in the FLIP accordingly.

Best,
Xuannan
On Jul 16, 2020, 4:15 PM +0800, Kurt Young <[hidden email]>, wrote:

> Hi Xuanna,
>
> Thanks for the detailed design doc, it described clearly how the API looks
> and how to interact with Flink runtime.
> However, the part which relates to SQL's optimizer is kind of blurry. To be
> more precise, I have following questions:
>
> 1. How do you identify the CachedTable? I can imagine there would be map
> representing the cache, how do you
> compare the keys of the map? One approach is they will be compared by java
> objects, which is simple but has
> limited scope. For example, users created another table using some
> interfaces of TableEnvironment, and the table
> is exactly the same as the cached one, you won't be able to identify it.
> Another choice is calculating the "signature" or
> "diest" of the cached table, which involves string representation of the
> whole sub tree represented by the cached table.
> I don't think Flink currently provides such a mechanism around Table
> though.
>
> 2. How does the CachedTable affect the optimizer? Specifically, will you
> have a dedicated QueryOperation for it, will you have
> a dedicated logical & physical RelNode for it? And I also don't see a
> description about how to work with current optimize phases,
> from Operation to Calcite rel node, and then to Flink's logical and
> physical node, which will be at last translated to Flink's exec node.
> There also exists other optimizations such as dead lock breaker, as well as
> sub plan reuse inside the optimizer, I'm not sure whether
> the logic dealing with cached tables can be orthogonal to all of these.
> Hence I expect you could have a more detailed description here.
>
> 3. What's the effect of calling TableEnvironment.close()? You already
> explained this would drop all caches this table env has,
> could you also explain where other functionality still works for this table
> env? Like can use still create/drop tables/databases/function
> through this table env? What happens to the catalog and all temporary
> objects of this table env?
>
> One minor comment: I noticed you used some not existing API in the examples
> you gave, like table.collect(), which is a little
> misleading.
>
> Best,
> Kurt
>
>
> On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <[hidden email]> wrote:
>
> > Hi folks,
> >
> > I'd like to revive the discussion about FLIP-36 Support Interactive
> > Programming in Flink Table API
> >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> >
> > The FLIP proposes to add support for interactive programming in Flink
> > Table API. Specifically, it let users cache the intermediate
> > results(tables) and use them in the later jobs to avoid recomputing the
> > intermediate result(tables).
> >
> > I am looking forward to any opinions and suggestions from the community.
> >
> > Best,
> > Xuannan
> > On May 7, 2020, 5:40 PM +0800, Xuannan Su <[hidden email]>, wrote:
> > > Hi,
> > >
> > > There are some feedbacks from @Timo and @Kurt in the voting thread for
> > FLIP-36 and I want to share my thoughts here.
> > >
> > > 1. How would the FLIP-36 look like after FLIP-84?
> > > I don't think FLIP-84 will affect FLIP-36 from the public API
> > perspective. Users can call .cache on a table object and the cached table
> > will be generated whenever the table job is triggered to execute, either by
> > Table#executeInsert or StatementSet#execute. I think that FLIP-36 should
> > aware of the changes made by FLIP-84, but it shouldn't be a problem. At the
> > end of the day, FLIP-36 only requires the ability to add a sink to a node,
> > submit a table job with multiple sinks, and replace the cached table with a
> > source.
> > >
> > > 2. How can we support cache in a multi-statement SQL file?
> > > The most intuitive way to support cache in a multi-statement SQL file is
> > by using a view, where the view is corresponding to a cached table.
> > >
> > > 3. Unifying the cached table and materialized views
> > > It is true that the cached table and the materialized view are similar
> > in some way. However, I think the materialized view is a more complex
> > concept. First, a materialized view requires some kind of a refresh
> > mechanism to synchronize with the table. Secondly, the life cycle of a
> > materialized view is longer. The materialized view should be accessible
> > even after the application exits and should be accessible by another
> > application, while the cached table is only accessible in the application
> > where it is created. The cached table is introduced to avoid recomputation
> > of an intermediate table to support interactive programming in Flink Table
> > API. And I think the materialized view needs more discussion and certainly
> > deserves a whole new FLIP.
> > >
> > > Please let me know your thought.
> > >
> > > Best,
> > > Xuannan
> > >
> > On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[hidden email]> wrote:
> > > Hi folks,
> > >
> > > The FLIP-36 is updated according to the discussion with Becket. In the
> > meantime, any comments are very welcome.
> > >
> > > If there are no further comments, I would like to start the voting
> > > thread by tomorrow.
> > >
> > > Thanks,
> > > Xuannan
> > >
> > >
> > > > On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]>
> > wrote:
> > > > > Hi Becket,
> > > > >
> > > > > You are right. It makes sense to treat retry of job 2 as an ordinary
> > job. And the config does introduce some unnecessary confusion. Thank you
> > for you comment. I will update the FLIP.
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > >
> > > > > > On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <[hidden email]>
> > wrote:
> > > > > > > Hi Xuannan,
> > > > > > >
> > > > > > > If user submits Job 1 and generated a cached intermediate
> > result. And later
> > > > > > > on, user submitted job 2 which should ideally use the
> > intermediate result.
> > > > > > > In that case, if job 2 failed due to missing the intermediate
> > result, Job 2
> > > > > > > should be retried with its full DAG. After that when Job 2 runs,
> > it will
> > > > > > > also re-generate the cache. However, once job 2 has fell back to
> > the
> > > > > > > original DAG, should it just be treated as an ordinary job that
> > follow the
> > > > > > > recovery strategy? Having a separate configuration seems a little
> > > > > > > confusing. In another word, re-generating the cache is just a
> > byproduct of
> > > > > > > running the full DAG of job 2, but is not the main purpose. It
> > is just like
> > > > > > > when job 1 runs to generate cache, it does not have a separate
> > config of
> > > > > > > retry to make sure the cache is generated. If it fails, it just
> > fail like
> > > > > > > an ordinary job.
> > > > > > >
> > > > > > > What do you think?
> > > > > > >
> > > > > > > Thanks,
> > > > > > >
> > > > > > > Jiangjie (Becket) Qin
> > > > > > >
> > > > > > > On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
> > [hidden email]> wrote:
> > > > > > >
> > > > > > > > Hi Becket,
> > > > > > > >
> > > > > > > > The intermediate result will indeed be automatically
> > re-generated by
> > > > > > > > resubmitting the original DAG. And that job could fail as
> > well. In that
> > > > > > > > case, we need to decide if we should resubmit the original DAG
> > to
> > > > > > > > re-generate the intermediate result or give up and throw an
> > exception to
> > > > > > > > the user. And the config is to indicate how many resubmit
> > should happen
> > > > > > > > before giving up.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Xuannan
> > > > > > > >
> > > > > > > > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
> > [hidden email]> wrote:
> > > > > > > >
> > > > > > > > > Hi Xuannan,
> > > > > > > > >
> > > > > > > > > I am not entirely sure if I understand the cases you
> > mentioned. The
> > > > > > > > users
> > > > > > > > > > can use the cached table object returned by the .cache()
> > method in
> > > > > > > > other
> > > > > > > > > > job and it should read the intermediate result. The
> > intermediate result
> > > > > > > > > can
> > > > > > > > > > gone in the following three cases: 1. the user explicitly
> > call the
> > > > > > > > > > invalidateCache() method 2. the TableEnvironment is closed
> > 3. failure
> > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > result will not
> > > > > > > > be
> > > > > > > > > > available unless it is re-generated.
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > What confused me was that why do we need to have a
> > *cache.retries.max
> > > > > > > > > *config?
> > > > > > > > > Shouldn't the missing intermediate result always be
> > automatically
> > > > > > > > > re-generated if it is gone?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
> > [hidden email]>
> > > > > > > > wrote:
> > > > > > > > >
> > > > > > > > > > Hi Becket,
> > > > > > > > > >
> > > > > > > > > > Thanks for the comments.
> > > > > > > > > >
> > > > > > > > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
> > [hidden email]>
> > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for picking up the FLIP. It looks good to me
> > overall. Some
> > > > > > > > quick
> > > > > > > > > > > comments / questions below:
> > > > > > > > > > >
> > > > > > > > > > > 1. Do we also need changes in the Java API?
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > Yes, the public interface of Table and TableEnvironment
> > should be made
> > > > > > > > in
> > > > > > > > > > the Java API.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > 2. What are the cases that users may want to retry
> > reading the
> > > > > > > > > > intermediate
> > > > > > > > > > > result? It seems that once the intermediate result has
> > gone, it will
> > > > > > > > > not
> > > > > > > > > > be
> > > > > > > > > > > available later without being generated again, right?
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > I am not entirely sure if I understand the cases you
> > mentioned. The
> > > > > > > > > users
> > > > > > > > > > can use the cached table object returned by the .cache()
> > method in
> > > > > > > > other
> > > > > > > > > > job and it should read the intermediate result. The
> > intermediate result
> > > > > > > > > can
> > > > > > > > > > gone in the following three cases: 1. the user explicitly
> > call the
> > > > > > > > > > invalidateCache() method 2. the TableEnvironment is closed
> > 3. failure
> > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > result will not
> > > > > > > > be
> > > > > > > > > > available unless it is re-generated.
> > > > > > > > > >
> > > > > > > > > > 3. In the "semantic of cache() method" section, the
> > description "The
> > > > > > > > > > > semantic of the *cache() *method is a little different
> > depending on
> > > > > > > > > > whether
> > > > > > > > > > > auto caching is enabled or not." seems not explained.
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > This line is actually outdated and should be removed, as
> > we are not
> > > > > > > > > adding
> > > > > > > > > > the auto caching functionality in this FLIP. Auto caching
> > will be added
> > > > > > > > > in
> > > > > > > > > > the future, and the semantic of cache() when auto caching
> > is enabled
> > > > > > > > will
> > > > > > > > > > be discussed in detail by a new FLIP. I will remove the
> > descriptor to
> > > > > > > > > avoid
> > > > > > > > > > further confusion.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
> > [hidden email]>
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi folks,
> > > > > > > > > > > >
> > > > > > > > > > > > I'd like to start the discussion about FLIP-36 Support
> > Interactive
> > > > > > > > > > > > Programming in Flink Table API
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > > > > > >
> > > > > > > > > > > > The FLIP proposes to add support for interactive
> > programming in
> > > > > > > > Flink
> > > > > > > > > > > Table
> > > > > > > > > > > > API. Specifically, it let users cache the intermediate
> > > > > > > > > results(tables)
> > > > > > > > > > > and
> > > > > > > > > > > > use them in the later jobs.
> > > > > > > > > > > >
> > > > > > > > > > > > Even though the FLIP has been discussed in the
> > past[1], the FLIP
> > > > > > > > > hasn't
> > > > > > > > > > > > formally passed the vote yet. And some of the design
> > and
> > > > > > > > > implementation
> > > > > > > > > > > > detail have to change to incorporates the cluster
> > partition
> > > > > > > > proposed
> > > > > > > > > in
> > > > > > > > > > > > FLIP-67[2].
> > > > > > > > > > > >
> > > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > > Xuannan
> > > > > > > > > > > >
> > > > > > > > > > > > [1]
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > > > > > > > > > > [2]
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> > https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > > > > > > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Kurt Young
Thanks for the reply, I have one more comment about the optimizer
affection. Even if you are
trying to make the cached table be as orthogonal to the optimizer as
possible by introducing
a special sink, it is still not clear why this approach is safe. Maybe you
can add some process
introduction from API to JobGraph, otherwise I can't make sure everyone
reviewing the design
doc will have the same imagination about this. And I'm also quite sure some
of the existing
mechanism will be affected by this special sink, e.g. multi sink
optimization.

Best,
Kurt


On Wed, Jul 22, 2020 at 2:31 PM Xuannan Su <[hidden email]> wrote:

> Hi Kurt,
>
> Thanks for the comments.
>
> 1. How do you identify the CachedTable?
> For the current design proposed in FLIP-36, we are using the first
> approach you mentioned, where the key of the map is the Cached Table java
> object. I think it is fine not to be able to identify another table
> representing the same DAG and not using the cached intermediate result
> because we want to make the caching table explicit. As mentioned in the
> FLIP, the cache API will return a Table object. And the user has to use the
> returned Table object to make use of the cached table. The rationale is
> that if the user builds the same DAG from scratch with some
> TableEnvironment instead of using the cached table object, the user
> probably doesn't want to use the cache.
>
> 2. How does the CachedTable affect the optimizer?
> We try to make the logic dealing with the cached table be as orthogonal to
> the optimizer as possible. That's why we introduce a special sink when we
> are going to cache a table and a special source when we are going to use a
> cached table. This way, we can let the optimizer does it works, and the
> logic of modifying the job graph can happen in the job graph generator. We
> can recognize the cached node with the special sink and source.
>
> 3. What's the effect of calling TableEnvironment.close()?
> We introduce the close method to prevent leaking of the cached table when
> the user is done with the table environment. Therefore, it makes more sense
> that the table environment, including all of its functionality, should not
> be used after closing. Otherwise, we should rename the close method to
> clearAllCache or something similar.
>
> And thanks for pointing out the use of not existing API used in the given
> examples. I have updated the examples in the FLIP accordingly.
>
> Best,
> Xuannan
> On Jul 16, 2020, 4:15 PM +0800, Kurt Young <[hidden email]>, wrote:
> > Hi Xuanna,
> >
> > Thanks for the detailed design doc, it described clearly how the API
> looks
> > and how to interact with Flink runtime.
> > However, the part which relates to SQL's optimizer is kind of blurry. To
> be
> > more precise, I have following questions:
> >
> > 1. How do you identify the CachedTable? I can imagine there would be map
> > representing the cache, how do you
> > compare the keys of the map? One approach is they will be compared by
> java
> > objects, which is simple but has
> > limited scope. For example, users created another table using some
> > interfaces of TableEnvironment, and the table
> > is exactly the same as the cached one, you won't be able to identify it.
> > Another choice is calculating the "signature" or
> > "diest" of the cached table, which involves string representation of the
> > whole sub tree represented by the cached table.
> > I don't think Flink currently provides such a mechanism around Table
> > though.
> >
> > 2. How does the CachedTable affect the optimizer? Specifically, will you
> > have a dedicated QueryOperation for it, will you have
> > a dedicated logical & physical RelNode for it? And I also don't see a
> > description about how to work with current optimize phases,
> > from Operation to Calcite rel node, and then to Flink's logical and
> > physical node, which will be at last translated to Flink's exec node.
> > There also exists other optimizations such as dead lock breaker, as well
> as
> > sub plan reuse inside the optimizer, I'm not sure whether
> > the logic dealing with cached tables can be orthogonal to all of these.
> > Hence I expect you could have a more detailed description here.
> >
> > 3. What's the effect of calling TableEnvironment.close()? You already
> > explained this would drop all caches this table env has,
> > could you also explain where other functionality still works for this
> table
> > env? Like can use still create/drop tables/databases/function
> > through this table env? What happens to the catalog and all temporary
> > objects of this table env?
> >
> > One minor comment: I noticed you used some not existing API in the
> examples
> > you gave, like table.collect(), which is a little
> > misleading.
> >
> > Best,
> > Kurt
> >
> >
> > On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <[hidden email]> wrote:
> >
> > > Hi folks,
> > >
> > > I'd like to revive the discussion about FLIP-36 Support Interactive
> > > Programming in Flink Table API
> > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > >
> > > The FLIP proposes to add support for interactive programming in Flink
> > > Table API. Specifically, it let users cache the intermediate
> > > results(tables) and use them in the later jobs to avoid recomputing the
> > > intermediate result(tables).
> > >
> > > I am looking forward to any opinions and suggestions from the
> community.
> > >
> > > Best,
> > > Xuannan
> > > On May 7, 2020, 5:40 PM +0800, Xuannan Su <[hidden email]>,
> wrote:
> > > > Hi,
> > > >
> > > > There are some feedbacks from @Timo and @Kurt in the voting thread
> for
> > > FLIP-36 and I want to share my thoughts here.
> > > >
> > > > 1. How would the FLIP-36 look like after FLIP-84?
> > > > I don't think FLIP-84 will affect FLIP-36 from the public API
> > > perspective. Users can call .cache on a table object and the cached
> table
> > > will be generated whenever the table job is triggered to execute,
> either by
> > > Table#executeInsert or StatementSet#execute. I think that FLIP-36
> should
> > > aware of the changes made by FLIP-84, but it shouldn't be a problem.
> At the
> > > end of the day, FLIP-36 only requires the ability to add a sink to a
> node,
> > > submit a table job with multiple sinks, and replace the cached table
> with a
> > > source.
> > > >
> > > > 2. How can we support cache in a multi-statement SQL file?
> > > > The most intuitive way to support cache in a multi-statement SQL
> file is
> > > by using a view, where the view is corresponding to a cached table.
> > > >
> > > > 3. Unifying the cached table and materialized views
> > > > It is true that the cached table and the materialized view are
> similar
> > > in some way. However, I think the materialized view is a more complex
> > > concept. First, a materialized view requires some kind of a refresh
> > > mechanism to synchronize with the table. Secondly, the life cycle of a
> > > materialized view is longer. The materialized view should be accessible
> > > even after the application exits and should be accessible by another
> > > application, while the cached table is only accessible in the
> application
> > > where it is created. The cached table is introduced to avoid
> recomputation
> > > of an intermediate table to support interactive programming in Flink
> Table
> > > API. And I think the materialized view needs more discussion and
> certainly
> > > deserves a whole new FLIP.
> > > >
> > > > Please let me know your thought.
> > > >
> > > > Best,
> > > > Xuannan
> > > >
> > > On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[hidden email]>
> wrote:
> > > > Hi folks,
> > > >
> > > > The FLIP-36 is updated according to the discussion with Becket. In
> the
> > > meantime, any comments are very welcome.
> > > >
> > > > If there are no further comments, I would like to start the voting
> > > > thread by tomorrow.
> > > >
> > > > Thanks,
> > > > Xuannan
> > > >
> > > >
> > > > > On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]>
> > > wrote:
> > > > > > Hi Becket,
> > > > > >
> > > > > > You are right. It makes sense to treat retry of job 2 as an
> ordinary
> > > job. And the config does introduce some unnecessary confusion. Thank
> you
> > > for you comment. I will update the FLIP.
> > > > > >
> > > > > > Best,
> > > > > > Xuannan
> > > > > >
> > > > > > > On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <
> [hidden email]>
> > > wrote:
> > > > > > > > Hi Xuannan,
> > > > > > > >
> > > > > > > > If user submits Job 1 and generated a cached intermediate
> > > result. And later
> > > > > > > > on, user submitted job 2 which should ideally use the
> > > intermediate result.
> > > > > > > > In that case, if job 2 failed due to missing the intermediate
> > > result, Job 2
> > > > > > > > should be retried with its full DAG. After that when Job 2
> runs,
> > > it will
> > > > > > > > also re-generate the cache. However, once job 2 has fell
> back to
> > > the
> > > > > > > > original DAG, should it just be treated as an ordinary job
> that
> > > follow the
> > > > > > > > recovery strategy? Having a separate configuration seems a
> little
> > > > > > > > confusing. In another word, re-generating the cache is just a
> > > byproduct of
> > > > > > > > running the full DAG of job 2, but is not the main purpose.
> It
> > > is just like
> > > > > > > > when job 1 runs to generate cache, it does not have a
> separate
> > > config of
> > > > > > > > retry to make sure the cache is generated. If it fails, it
> just
> > > fail like
> > > > > > > > an ordinary job.
> > > > > > > >
> > > > > > > > What do you think?
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > >
> > > > > > > > Jiangjie (Becket) Qin
> > > > > > > >
> > > > > > > > On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
> > > [hidden email]> wrote:
> > > > > > > >
> > > > > > > > > Hi Becket,
> > > > > > > > >
> > > > > > > > > The intermediate result will indeed be automatically
> > > re-generated by
> > > > > > > > > resubmitting the original DAG. And that job could fail as
> > > well. In that
> > > > > > > > > case, we need to decide if we should resubmit the original
> DAG
> > > to
> > > > > > > > > re-generate the intermediate result or give up and throw an
> > > exception to
> > > > > > > > > the user. And the config is to indicate how many resubmit
> > > should happen
> > > > > > > > > before giving up.
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > > Xuannan
> > > > > > > > >
> > > > > > > > > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
> > > [hidden email]> wrote:
> > > > > > > > >
> > > > > > > > > > Hi Xuannan,
> > > > > > > > > >
> > > > > > > > > > I am not entirely sure if I understand the cases you
> > > mentioned. The
> > > > > > > > > users
> > > > > > > > > > > can use the cached table object returned by the
> .cache()
> > > method in
> > > > > > > > > other
> > > > > > > > > > > job and it should read the intermediate result. The
> > > intermediate result
> > > > > > > > > > can
> > > > > > > > > > > gone in the following three cases: 1. the user
> explicitly
> > > call the
> > > > > > > > > > > invalidateCache() method 2. the TableEnvironment is
> closed
> > > 3. failure
> > > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > > result will not
> > > > > > > > > be
> > > > > > > > > > > available unless it is re-generated.
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > What confused me was that why do we need to have a
> > > *cache.retries.max
> > > > > > > > > > *config?
> > > > > > > > > > Shouldn't the missing intermediate result always be
> > > automatically
> > > > > > > > > > re-generated if it is gone?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
> > > [hidden email]>
> > > > > > > > > wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Becket,
> > > > > > > > > > >
> > > > > > > > > > > Thanks for the comments.
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
> > > [hidden email]>
> > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for picking up the FLIP. It looks good to me
> > > overall. Some
> > > > > > > > > quick
> > > > > > > > > > > > comments / questions below:
> > > > > > > > > > > >
> > > > > > > > > > > > 1. Do we also need changes in the Java API?
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > Yes, the public interface of Table and TableEnvironment
> > > should be made
> > > > > > > > > in
> > > > > > > > > > > the Java API.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > 2. What are the cases that users may want to retry
> > > reading the
> > > > > > > > > > > intermediate
> > > > > > > > > > > > result? It seems that once the intermediate result
> has
> > > gone, it will
> > > > > > > > > > not
> > > > > > > > > > > be
> > > > > > > > > > > > available later without being generated again, right?
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > I am not entirely sure if I understand the cases you
> > > mentioned. The
> > > > > > > > > > users
> > > > > > > > > > > can use the cached table object returned by the
> .cache()
> > > method in
> > > > > > > > > other
> > > > > > > > > > > job and it should read the intermediate result. The
> > > intermediate result
> > > > > > > > > > can
> > > > > > > > > > > gone in the following three cases: 1. the user
> explicitly
> > > call the
> > > > > > > > > > > invalidateCache() method 2. the TableEnvironment is
> closed
> > > 3. failure
> > > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > > result will not
> > > > > > > > > be
> > > > > > > > > > > available unless it is re-generated.
> > > > > > > > > > >
> > > > > > > > > > > 3. In the "semantic of cache() method" section, the
> > > description "The
> > > > > > > > > > > > semantic of the *cache() *method is a little
> different
> > > depending on
> > > > > > > > > > > whether
> > > > > > > > > > > > auto caching is enabled or not." seems not explained.
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > This line is actually outdated and should be removed,
> as
> > > we are not
> > > > > > > > > > adding
> > > > > > > > > > > the auto caching functionality in this FLIP. Auto
> caching
> > > will be added
> > > > > > > > > > in
> > > > > > > > > > > the future, and the semantic of cache() when auto
> caching
> > > is enabled
> > > > > > > > > will
> > > > > > > > > > > be discussed in detail by a new FLIP. I will remove the
> > > descriptor to
> > > > > > > > > > avoid
> > > > > > > > > > > further confusion.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
> > > [hidden email]>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi folks,
> > > > > > > > > > > > >
> > > > > > > > > > > > > I'd like to start the discussion about FLIP-36
> Support
> > > Interactive
> > > > > > > > > > > > > Programming in Flink Table API
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > > > > > > >
> > > > > > > > > > > > > The FLIP proposes to add support for interactive
> > > programming in
> > > > > > > > > Flink
> > > > > > > > > > > > Table
> > > > > > > > > > > > > API. Specifically, it let users cache the
> intermediate
> > > > > > > > > > results(tables)
> > > > > > > > > > > > and
> > > > > > > > > > > > > use them in the later jobs.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Even though the FLIP has been discussed in the
> > > past[1], the FLIP
> > > > > > > > > > hasn't
> > > > > > > > > > > > > formally passed the vote yet. And some of the
> design
> > > and
> > > > > > > > > > implementation
> > > > > > > > > > > > > detail have to change to incorporates the cluster
> > > partition
> > > > > > > > > proposed
> > > > > > > > > > in
> > > > > > > > > > > > > FLIP-67[2].
> > > > > > > > > > > > >
> > > > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Xuannan
> > > > > > > > > > > > >
> > > > > > > > > > > > > [1]
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > > > > > > > > > > > [2]
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > >
> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > > > > > > >
> > >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi Kurt,

Thanks for the comments.

You are right that the FLIP lacks a proper discussion about the impact of the optimizer. I have added the section to talk about how the cache table works with the optimizer. I hope this could resolve your concern. Please let me know if you have any further comments.

Best,
Xuannan
On Jul 22, 2020, 4:36 PM +0800, Kurt Young <[hidden email]>, wrote:

> Thanks for the reply, I have one more comment about the optimizer
> affection. Even if you are
> trying to make the cached table be as orthogonal to the optimizer as
> possible by introducing
> a special sink, it is still not clear why this approach is safe. Maybe you
> can add some process
> introduction from API to JobGraph, otherwise I can't make sure everyone
> reviewing the design
> doc will have the same imagination about this. And I'm also quite sure some
> of the existing
> mechanism will be affected by this special sink, e.g. multi sink
> optimization.
>
> Best,
> Kurt
>
>
> On Wed, Jul 22, 2020 at 2:31 PM Xuannan Su <[hidden email]> wrote:
>
> > Hi Kurt,
> >
> > Thanks for the comments.
> >
> > 1. How do you identify the CachedTable?
> > For the current design proposed in FLIP-36, we are using the first
> > approach you mentioned, where the key of the map is the Cached Table java
> > object. I think it is fine not to be able to identify another table
> > representing the same DAG and not using the cached intermediate result
> > because we want to make the caching table explicit. As mentioned in the
> > FLIP, the cache API will return a Table object. And the user has to use the
> > returned Table object to make use of the cached table. The rationale is
> > that if the user builds the same DAG from scratch with some
> > TableEnvironment instead of using the cached table object, the user
> > probably doesn't want to use the cache.
> >
> > 2. How does the CachedTable affect the optimizer?
> > We try to make the logic dealing with the cached table be as orthogonal to
> > the optimizer as possible. That's why we introduce a special sink when we
> > are going to cache a table and a special source when we are going to use a
> > cached table. This way, we can let the optimizer does it works, and the
> > logic of modifying the job graph can happen in the job graph generator. We
> > can recognize the cached node with the special sink and source.
> >
> > 3. What's the effect of calling TableEnvironment.close()?
> > We introduce the close method to prevent leaking of the cached table when
> > the user is done with the table environment. Therefore, it makes more sense
> > that the table environment, including all of its functionality, should not
> > be used after closing. Otherwise, we should rename the close method to
> > clearAllCache or something similar.
> >
> > And thanks for pointing out the use of not existing API used in the given
> > examples. I have updated the examples in the FLIP accordingly.
> >
> > Best,
> > Xuannan
> > On Jul 16, 2020, 4:15 PM +0800, Kurt Young <[hidden email]>, wrote:
> > > Hi Xuanna,
> > >
> > > Thanks for the detailed design doc, it described clearly how the API
> > looks
> > > and how to interact with Flink runtime.
> > > However, the part which relates to SQL's optimizer is kind of blurry. To
> > be
> > > more precise, I have following questions:
> > >
> > > 1. How do you identify the CachedTable? I can imagine there would be map
> > > representing the cache, how do you
> > > compare the keys of the map? One approach is they will be compared by
> > java
> > > objects, which is simple but has
> > > limited scope. For example, users created another table using some
> > > interfaces of TableEnvironment, and the table
> > > is exactly the same as the cached one, you won't be able to identify it.
> > > Another choice is calculating the "signature" or
> > > "diest" of the cached table, which involves string representation of the
> > > whole sub tree represented by the cached table.
> > > I don't think Flink currently provides such a mechanism around Table
> > > though.
> > >
> > > 2. How does the CachedTable affect the optimizer? Specifically, will you
> > > have a dedicated QueryOperation for it, will you have
> > > a dedicated logical & physical RelNode for it? And I also don't see a
> > > description about how to work with current optimize phases,
> > > from Operation to Calcite rel node, and then to Flink's logical and
> > > physical node, which will be at last translated to Flink's exec node.
> > > There also exists other optimizations such as dead lock breaker, as well
> > as
> > > sub plan reuse inside the optimizer, I'm not sure whether
> > > the logic dealing with cached tables can be orthogonal to all of these.
> > > Hence I expect you could have a more detailed description here.
> > >
> > > 3. What's the effect of calling TableEnvironment.close()? You already
> > > explained this would drop all caches this table env has,
> > > could you also explain where other functionality still works for this
> > table
> > > env? Like can use still create/drop tables/databases/function
> > > through this table env? What happens to the catalog and all temporary
> > > objects of this table env?
> > >
> > > One minor comment: I noticed you used some not existing API in the
> > examples
> > > you gave, like table.collect(), which is a little
> > > misleading.
> > >
> > > Best,
> > > Kurt
> > >
> > >
> > > On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <[hidden email]> wrote:
> > >
> > > > Hi folks,
> > > >
> > > > I'd like to revive the discussion about FLIP-36 Support Interactive
> > > > Programming in Flink Table API
> > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > >
> > > > The FLIP proposes to add support for interactive programming in Flink
> > > > Table API. Specifically, it let users cache the intermediate
> > > > results(tables) and use them in the later jobs to avoid recomputing the
> > > > intermediate result(tables).
> > > >
> > > > I am looking forward to any opinions and suggestions from the
> > community.
> > > >
> > > > Best,
> > > > Xuannan
> > > > On May 7, 2020, 5:40 PM +0800, Xuannan Su <[hidden email]>,
> > wrote:
> > > > > Hi,
> > > > >
> > > > > There are some feedbacks from @Timo and @Kurt in the voting thread
> > for
> > > > FLIP-36 and I want to share my thoughts here.
> > > > >
> > > > > 1. How would the FLIP-36 look like after FLIP-84?
> > > > > I don't think FLIP-84 will affect FLIP-36 from the public API
> > > > perspective. Users can call .cache on a table object and the cached
> > table
> > > > will be generated whenever the table job is triggered to execute,
> > either by
> > > > Table#executeInsert or StatementSet#execute. I think that FLIP-36
> > should
> > > > aware of the changes made by FLIP-84, but it shouldn't be a problem.
> > At the
> > > > end of the day, FLIP-36 only requires the ability to add a sink to a
> > node,
> > > > submit a table job with multiple sinks, and replace the cached table
> > with a
> > > > source.
> > > > >
> > > > > 2. How can we support cache in a multi-statement SQL file?
> > > > > The most intuitive way to support cache in a multi-statement SQL
> > file is
> > > > by using a view, where the view is corresponding to a cached table.
> > > > >
> > > > > 3. Unifying the cached table and materialized views
> > > > > It is true that the cached table and the materialized view are
> > similar
> > > > in some way. However, I think the materialized view is a more complex
> > > > concept. First, a materialized view requires some kind of a refresh
> > > > mechanism to synchronize with the table. Secondly, the life cycle of a
> > > > materialized view is longer. The materialized view should be accessible
> > > > even after the application exits and should be accessible by another
> > > > application, while the cached table is only accessible in the
> > application
> > > > where it is created. The cached table is introduced to avoid
> > recomputation
> > > > of an intermediate table to support interactive programming in Flink
> > Table
> > > > API. And I think the materialized view needs more discussion and
> > certainly
> > > > deserves a whole new FLIP.
> > > > >
> > > > > Please let me know your thought.
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > >
> > > > On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[hidden email]>
> > wrote:
> > > > > Hi folks,
> > > > >
> > > > > The FLIP-36 is updated according to the discussion with Becket. In
> > the
> > > > meantime, any comments are very welcome.
> > > > >
> > > > > If there are no further comments, I would like to start the voting
> > > > > thread by tomorrow.
> > > > >
> > > > > Thanks,
> > > > > Xuannan
> > > > >
> > > > >
> > > > > > On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]>
> > > > wrote:
> > > > > > > Hi Becket,
> > > > > > >
> > > > > > > You are right. It makes sense to treat retry of job 2 as an
> > ordinary
> > > > job. And the config does introduce some unnecessary confusion. Thank
> > you
> > > > for you comment. I will update the FLIP.
> > > > > > >
> > > > > > > Best,
> > > > > > > Xuannan
> > > > > > >
> > > > > > > > On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <
> > [hidden email]>
> > > > wrote:
> > > > > > > > > Hi Xuannan,
> > > > > > > > >
> > > > > > > > > If user submits Job 1 and generated a cached intermediate
> > > > result. And later
> > > > > > > > > on, user submitted job 2 which should ideally use the
> > > > intermediate result.
> > > > > > > > > In that case, if job 2 failed due to missing the intermediate
> > > > result, Job 2
> > > > > > > > > should be retried with its full DAG. After that when Job 2
> > runs,
> > > > it will
> > > > > > > > > also re-generate the cache. However, once job 2 has fell
> > back to
> > > > the
> > > > > > > > > original DAG, should it just be treated as an ordinary job
> > that
> > > > follow the
> > > > > > > > > recovery strategy? Having a separate configuration seems a
> > little
> > > > > > > > > confusing. In another word, re-generating the cache is just a
> > > > byproduct of
> > > > > > > > > running the full DAG of job 2, but is not the main purpose.
> > It
> > > > is just like
> > > > > > > > > when job 1 runs to generate cache, it does not have a
> > separate
> > > > config of
> > > > > > > > > retry to make sure the cache is generated. If it fails, it
> > just
> > > > fail like
> > > > > > > > > an ordinary job.
> > > > > > > > >
> > > > > > > > > What do you think?
> > > > > > > > >
> > > > > > > > > Thanks,
> > > > > > > > >
> > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > >
> > > > > > > > > On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
> > > > [hidden email]> wrote:
> > > > > > > > >
> > > > > > > > > > Hi Becket,
> > > > > > > > > >
> > > > > > > > > > The intermediate result will indeed be automatically
> > > > re-generated by
> > > > > > > > > > resubmitting the original DAG. And that job could fail as
> > > > well. In that
> > > > > > > > > > case, we need to decide if we should resubmit the original
> > DAG
> > > > to
> > > > > > > > > > re-generate the intermediate result or give up and throw an
> > > > exception to
> > > > > > > > > > the user. And the config is to indicate how many resubmit
> > > > should happen
> > > > > > > > > > before giving up.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Xuannan
> > > > > > > > > >
> > > > > > > > > > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
> > > > [hidden email]> wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > >
> > > > > > > > > > > I am not entirely sure if I understand the cases you
> > > > mentioned. The
> > > > > > > > > > users
> > > > > > > > > > > > can use the cached table object returned by the
> > .cache()
> > > > method in
> > > > > > > > > > other
> > > > > > > > > > > > job and it should read the intermediate result. The
> > > > intermediate result
> > > > > > > > > > > can
> > > > > > > > > > > > gone in the following three cases: 1. the user
> > explicitly
> > > > call the
> > > > > > > > > > > > invalidateCache() method 2. the TableEnvironment is
> > closed
> > > > 3. failure
> > > > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > > > result will not
> > > > > > > > > > be
> > > > > > > > > > > > available unless it is re-generated.
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > What confused me was that why do we need to have a
> > > > *cache.retries.max
> > > > > > > > > > > *config?
> > > > > > > > > > > Shouldn't the missing intermediate result always be
> > > > automatically
> > > > > > > > > > > re-generated if it is gone?
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > >
> > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
> > > > [hidden email]>
> > > > > > > > > > wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Becket,
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks for the comments.
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
> > > > [hidden email]>
> > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for picking up the FLIP. It looks good to me
> > > > overall. Some
> > > > > > > > > > quick
> > > > > > > > > > > > > comments / questions below:
> > > > > > > > > > > > >
> > > > > > > > > > > > > 1. Do we also need changes in the Java API?
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > Yes, the public interface of Table and TableEnvironment
> > > > should be made
> > > > > > > > > > in
> > > > > > > > > > > > the Java API.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > > 2. What are the cases that users may want to retry
> > > > reading the
> > > > > > > > > > > > intermediate
> > > > > > > > > > > > > result? It seems that once the intermediate result
> > has
> > > > gone, it will
> > > > > > > > > > > not
> > > > > > > > > > > > be
> > > > > > > > > > > > > available later without being generated again, right?
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > I am not entirely sure if I understand the cases you
> > > > mentioned. The
> > > > > > > > > > > users
> > > > > > > > > > > > can use the cached table object returned by the
> > .cache()
> > > > method in
> > > > > > > > > > other
> > > > > > > > > > > > job and it should read the intermediate result. The
> > > > intermediate result
> > > > > > > > > > > can
> > > > > > > > > > > > gone in the following three cases: 1. the user
> > explicitly
> > > > call the
> > > > > > > > > > > > invalidateCache() method 2. the TableEnvironment is
> > closed
> > > > 3. failure
> > > > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > > > result will not
> > > > > > > > > > be
> > > > > > > > > > > > available unless it is re-generated.
> > > > > > > > > > > >
> > > > > > > > > > > > 3. In the "semantic of cache() method" section, the
> > > > description "The
> > > > > > > > > > > > > semantic of the *cache() *method is a little
> > different
> > > > depending on
> > > > > > > > > > > > whether
> > > > > > > > > > > > > auto caching is enabled or not." seems not explained.
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > This line is actually outdated and should be removed,
> > as
> > > > we are not
> > > > > > > > > > > adding
> > > > > > > > > > > > the auto caching functionality in this FLIP. Auto
> > caching
> > > > will be added
> > > > > > > > > > > in
> > > > > > > > > > > > the future, and the semantic of cache() when auto
> > caching
> > > > is enabled
> > > > > > > > > > will
> > > > > > > > > > > > be discussed in detail by a new FLIP. I will remove the
> > > > descriptor to
> > > > > > > > > > > avoid
> > > > > > > > > > > > further confusion.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
> > > > [hidden email]>
> > > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi folks,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I'd like to start the discussion about FLIP-36
> > Support
> > > > Interactive
> > > > > > > > > > > > > > Programming in Flink Table API
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > The FLIP proposes to add support for interactive
> > > > programming in
> > > > > > > > > > Flink
> > > > > > > > > > > > > Table
> > > > > > > > > > > > > > API. Specifically, it let users cache the
> > intermediate
> > > > > > > > > > > results(tables)
> > > > > > > > > > > > > and
> > > > > > > > > > > > > > use them in the later jobs.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Even though the FLIP has been discussed in the
> > > > past[1], the FLIP
> > > > > > > > > > > hasn't
> > > > > > > > > > > > > > formally passed the vote yet. And some of the
> > design
> > > > and
> > > > > > > > > > > implementation
> > > > > > > > > > > > > > detail have to change to incorporates the cluster
> > > > partition
> > > > > > > > > > proposed
> > > > > > > > > > > in
> > > > > > > > > > > > > > FLIP-67[2].
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > Xuannan
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > >
> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > > > > > > > > > > > > [2]
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > >
> > https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > > > > > > >
> > > >
> >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi folks,

It seems that all the raised concerns so far have been resolved. I plan to start a voting thread for FLIP-36 early next week if there are no comments.

Thanks,
Xuannan
On Jul 28, 2020, 7:42 PM +0800, Xuannan Su <[hidden email]>, wrote:

> Hi Kurt,
>
> Thanks for the comments.
>
> You are right that the FLIP lacks a proper discussion about the impact of the optimizer. I have added the section to talk about how the cache table works with the optimizer. I hope this could resolve your concern. Please let me know if you have any further comments.
>
> Best,
> Xuannan
> On Jul 22, 2020, 4:36 PM +0800, Kurt Young <[hidden email]>, wrote:
> > Thanks for the reply, I have one more comment about the optimizer
> > affection. Even if you are
> > trying to make the cached table be as orthogonal to the optimizer as
> > possible by introducing
> > a special sink, it is still not clear why this approach is safe. Maybe you
> > can add some process
> > introduction from API to JobGraph, otherwise I can't make sure everyone
> > reviewing the design
> > doc will have the same imagination about this. And I'm also quite sure some
> > of the existing
> > mechanism will be affected by this special sink, e.g. multi sink
> > optimization.
> >
> > Best,
> > Kurt
> >
> >
> > On Wed, Jul 22, 2020 at 2:31 PM Xuannan Su <[hidden email]> wrote:
> >
> > > Hi Kurt,
> > >
> > > Thanks for the comments.
> > >
> > > 1. How do you identify the CachedTable?
> > > For the current design proposed in FLIP-36, we are using the first
> > > approach you mentioned, where the key of the map is the Cached Table java
> > > object. I think it is fine not to be able to identify another table
> > > representing the same DAG and not using the cached intermediate result
> > > because we want to make the caching table explicit. As mentioned in the
> > > FLIP, the cache API will return a Table object. And the user has to use the
> > > returned Table object to make use of the cached table. The rationale is
> > > that if the user builds the same DAG from scratch with some
> > > TableEnvironment instead of using the cached table object, the user
> > > probably doesn't want to use the cache.
> > >
> > > 2. How does the CachedTable affect the optimizer?
> > > We try to make the logic dealing with the cached table be as orthogonal to
> > > the optimizer as possible. That's why we introduce a special sink when we
> > > are going to cache a table and a special source when we are going to use a
> > > cached table. This way, we can let the optimizer does it works, and the
> > > logic of modifying the job graph can happen in the job graph generator. We
> > > can recognize the cached node with the special sink and source.
> > >
> > > 3. What's the effect of calling TableEnvironment.close()?
> > > We introduce the close method to prevent leaking of the cached table when
> > > the user is done with the table environment. Therefore, it makes more sense
> > > that the table environment, including all of its functionality, should not
> > > be used after closing. Otherwise, we should rename the close method to
> > > clearAllCache or something similar.
> > >
> > > And thanks for pointing out the use of not existing API used in the given
> > > examples. I have updated the examples in the FLIP accordingly.
> > >
> > > Best,
> > > Xuannan
> > > On Jul 16, 2020, 4:15 PM +0800, Kurt Young <[hidden email]>, wrote:
> > > > Hi Xuanna,
> > > >
> > > > Thanks for the detailed design doc, it described clearly how the API
> > > looks
> > > > and how to interact with Flink runtime.
> > > > However, the part which relates to SQL's optimizer is kind of blurry. To
> > > be
> > > > more precise, I have following questions:
> > > >
> > > > 1. How do you identify the CachedTable? I can imagine there would be map
> > > > representing the cache, how do you
> > > > compare the keys of the map? One approach is they will be compared by
> > > java
> > > > objects, which is simple but has
> > > > limited scope. For example, users created another table using some
> > > > interfaces of TableEnvironment, and the table
> > > > is exactly the same as the cached one, you won't be able to identify it.
> > > > Another choice is calculating the "signature" or
> > > > "diest" of the cached table, which involves string representation of the
> > > > whole sub tree represented by the cached table.
> > > > I don't think Flink currently provides such a mechanism around Table
> > > > though.
> > > >
> > > > 2. How does the CachedTable affect the optimizer? Specifically, will you
> > > > have a dedicated QueryOperation for it, will you have
> > > > a dedicated logical & physical RelNode for it? And I also don't see a
> > > > description about how to work with current optimize phases,
> > > > from Operation to Calcite rel node, and then to Flink's logical and
> > > > physical node, which will be at last translated to Flink's exec node.
> > > > There also exists other optimizations such as dead lock breaker, as well
> > > as
> > > > sub plan reuse inside the optimizer, I'm not sure whether
> > > > the logic dealing with cached tables can be orthogonal to all of these.
> > > > Hence I expect you could have a more detailed description here.
> > > >
> > > > 3. What's the effect of calling TableEnvironment.close()? You already
> > > > explained this would drop all caches this table env has,
> > > > could you also explain where other functionality still works for this
> > > table
> > > > env? Like can use still create/drop tables/databases/function
> > > > through this table env? What happens to the catalog and all temporary
> > > > objects of this table env?
> > > >
> > > > One minor comment: I noticed you used some not existing API in the
> > > examples
> > > > you gave, like table.collect(), which is a little
> > > > misleading.
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <[hidden email]> wrote:
> > > >
> > > > > Hi folks,
> > > > >
> > > > > I'd like to revive the discussion about FLIP-36 Support Interactive
> > > > > Programming in Flink Table API
> > > > >
> > > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > >
> > > > > The FLIP proposes to add support for interactive programming in Flink
> > > > > Table API. Specifically, it let users cache the intermediate
> > > > > results(tables) and use them in the later jobs to avoid recomputing the
> > > > > intermediate result(tables).
> > > > >
> > > > > I am looking forward to any opinions and suggestions from the
> > > community.
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > > On May 7, 2020, 5:40 PM +0800, Xuannan Su <[hidden email]>,
> > > wrote:
> > > > > > Hi,
> > > > > >
> > > > > > There are some feedbacks from @Timo and @Kurt in the voting thread
> > > for
> > > > > FLIP-36 and I want to share my thoughts here.
> > > > > >
> > > > > > 1. How would the FLIP-36 look like after FLIP-84?
> > > > > > I don't think FLIP-84 will affect FLIP-36 from the public API
> > > > > perspective. Users can call .cache on a table object and the cached
> > > table
> > > > > will be generated whenever the table job is triggered to execute,
> > > either by
> > > > > Table#executeInsert or StatementSet#execute. I think that FLIP-36
> > > should
> > > > > aware of the changes made by FLIP-84, but it shouldn't be a problem.
> > > At the
> > > > > end of the day, FLIP-36 only requires the ability to add a sink to a
> > > node,
> > > > > submit a table job with multiple sinks, and replace the cached table
> > > with a
> > > > > source.
> > > > > >
> > > > > > 2. How can we support cache in a multi-statement SQL file?
> > > > > > The most intuitive way to support cache in a multi-statement SQL
> > > file is
> > > > > by using a view, where the view is corresponding to a cached table.
> > > > > >
> > > > > > 3. Unifying the cached table and materialized views
> > > > > > It is true that the cached table and the materialized view are
> > > similar
> > > > > in some way. However, I think the materialized view is a more complex
> > > > > concept. First, a materialized view requires some kind of a refresh
> > > > > mechanism to synchronize with the table. Secondly, the life cycle of a
> > > > > materialized view is longer. The materialized view should be accessible
> > > > > even after the application exits and should be accessible by another
> > > > > application, while the cached table is only accessible in the
> > > application
> > > > > where it is created. The cached table is introduced to avoid
> > > recomputation
> > > > > of an intermediate table to support interactive programming in Flink
> > > Table
> > > > > API. And I think the materialized view needs more discussion and
> > > certainly
> > > > > deserves a whole new FLIP.
> > > > > >
> > > > > > Please let me know your thought.
> > > > > >
> > > > > > Best,
> > > > > > Xuannan
> > > > > >
> > > > > On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[hidden email]>
> > > wrote:
> > > > > > Hi folks,
> > > > > >
> > > > > > The FLIP-36 is updated according to the discussion with Becket. In
> > > the
> > > > > meantime, any comments are very welcome.
> > > > > >
> > > > > > If there are no further comments, I would like to start the voting
> > > > > > thread by tomorrow.
> > > > > >
> > > > > > Thanks,
> > > > > > Xuannan
> > > > > >
> > > > > >
> > > > > > > On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]>
> > > > > wrote:
> > > > > > > > Hi Becket,
> > > > > > > >
> > > > > > > > You are right. It makes sense to treat retry of job 2 as an
> > > ordinary
> > > > > job. And the config does introduce some unnecessary confusion. Thank
> > > you
> > > > > for you comment. I will update the FLIP.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Xuannan
> > > > > > > >
> > > > > > > > > On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <
> > > [hidden email]>
> > > > > wrote:
> > > > > > > > > > Hi Xuannan,
> > > > > > > > > >
> > > > > > > > > > If user submits Job 1 and generated a cached intermediate
> > > > > result. And later
> > > > > > > > > > on, user submitted job 2 which should ideally use the
> > > > > intermediate result.
> > > > > > > > > > In that case, if job 2 failed due to missing the intermediate
> > > > > result, Job 2
> > > > > > > > > > should be retried with its full DAG. After that when Job 2
> > > runs,
> > > > > it will
> > > > > > > > > > also re-generate the cache. However, once job 2 has fell
> > > back to
> > > > > the
> > > > > > > > > > original DAG, should it just be treated as an ordinary job
> > > that
> > > > > follow the
> > > > > > > > > > recovery strategy? Having a separate configuration seems a
> > > little
> > > > > > > > > > confusing. In another word, re-generating the cache is just a
> > > > > byproduct of
> > > > > > > > > > running the full DAG of job 2, but is not the main purpose.
> > > It
> > > > > is just like
> > > > > > > > > > when job 1 runs to generate cache, it does not have a
> > > separate
> > > > > config of
> > > > > > > > > > retry to make sure the cache is generated. If it fails, it
> > > just
> > > > > fail like
> > > > > > > > > > an ordinary job.
> > > > > > > > > >
> > > > > > > > > > What do you think?
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > >
> > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > >
> > > > > > > > > > On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
> > > > > [hidden email]> wrote:
> > > > > > > > > >
> > > > > > > > > > > Hi Becket,
> > > > > > > > > > >
> > > > > > > > > > > The intermediate result will indeed be automatically
> > > > > re-generated by
> > > > > > > > > > > resubmitting the original DAG. And that job could fail as
> > > > > well. In that
> > > > > > > > > > > case, we need to decide if we should resubmit the original
> > > DAG
> > > > > to
> > > > > > > > > > > re-generate the intermediate result or give up and throw an
> > > > > exception to
> > > > > > > > > > > the user. And the config is to indicate how many resubmit
> > > > > should happen
> > > > > > > > > > > before giving up.
> > > > > > > > > > >
> > > > > > > > > > > Thanks,
> > > > > > > > > > > Xuannan
> > > > > > > > > > >
> > > > > > > > > > > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
> > > > > [hidden email]> wrote:
> > > > > > > > > > >
> > > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > > >
> > > > > > > > > > > > I am not entirely sure if I understand the cases you
> > > > > mentioned. The
> > > > > > > > > > > users
> > > > > > > > > > > > > can use the cached table object returned by the
> > > .cache()
> > > > > method in
> > > > > > > > > > > other
> > > > > > > > > > > > > job and it should read the intermediate result. The
> > > > > intermediate result
> > > > > > > > > > > > can
> > > > > > > > > > > > > gone in the following three cases: 1. the user
> > > explicitly
> > > > > call the
> > > > > > > > > > > > > invalidateCache() method 2. the TableEnvironment is
> > > closed
> > > > > 3. failure
> > > > > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > > > > result will not
> > > > > > > > > > > be
> > > > > > > > > > > > > available unless it is re-generated.
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > What confused me was that why do we need to have a
> > > > > *cache.retries.max
> > > > > > > > > > > > *config?
> > > > > > > > > > > > Shouldn't the missing intermediate result always be
> > > > > automatically
> > > > > > > > > > > > re-generated if it is gone?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
> > > > > [hidden email]>
> > > > > > > > > > > wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Becket,
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks for the comments.
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
> > > > > [hidden email]>
> > > > > > > > > > > wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks for picking up the FLIP. It looks good to me
> > > > > overall. Some
> > > > > > > > > > > quick
> > > > > > > > > > > > > > comments / questions below:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > 1. Do we also need changes in the Java API?
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > Yes, the public interface of Table and TableEnvironment
> > > > > should be made
> > > > > > > > > > > in
> > > > > > > > > > > > > the Java API.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > > 2. What are the cases that users may want to retry
> > > > > reading the
> > > > > > > > > > > > > intermediate
> > > > > > > > > > > > > > result? It seems that once the intermediate result
> > > has
> > > > > gone, it will
> > > > > > > > > > > > not
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > available later without being generated again, right?
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > I am not entirely sure if I understand the cases you
> > > > > mentioned. The
> > > > > > > > > > > > users
> > > > > > > > > > > > > can use the cached table object returned by the
> > > .cache()
> > > > > method in
> > > > > > > > > > > other
> > > > > > > > > > > > > job and it should read the intermediate result. The
> > > > > intermediate result
> > > > > > > > > > > > can
> > > > > > > > > > > > > gone in the following three cases: 1. the user
> > > explicitly
> > > > > call the
> > > > > > > > > > > > > invalidateCache() method 2. the TableEnvironment is
> > > closed
> > > > > 3. failure
> > > > > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > > > > result will not
> > > > > > > > > > > be
> > > > > > > > > > > > > available unless it is re-generated.
> > > > > > > > > > > > >
> > > > > > > > > > > > > 3. In the "semantic of cache() method" section, the
> > > > > description "The
> > > > > > > > > > > > > > semantic of the *cache() *method is a little
> > > different
> > > > > depending on
> > > > > > > > > > > > > whether
> > > > > > > > > > > > > > auto caching is enabled or not." seems not explained.
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > This line is actually outdated and should be removed,
> > > as
> > > > > we are not
> > > > > > > > > > > > adding
> > > > > > > > > > > > > the auto caching functionality in this FLIP. Auto
> > > caching
> > > > > will be added
> > > > > > > > > > > > in
> > > > > > > > > > > > > the future, and the semantic of cache() when auto
> > > caching
> > > > > is enabled
> > > > > > > > > > > will
> > > > > > > > > > > > > be discussed in detail by a new FLIP. I will remove the
> > > > > descriptor to
> > > > > > > > > > > > avoid
> > > > > > > > > > > > > further confusion.
> > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
> > > > > [hidden email]>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi folks,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I'd like to start the discussion about FLIP-36
> > > Support
> > > > > Interactive
> > > > > > > > > > > > > > > Programming in Flink Table API
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > The FLIP proposes to add support for interactive
> > > > > programming in
> > > > > > > > > > > Flink
> > > > > > > > > > > > > > Table
> > > > > > > > > > > > > > > API. Specifically, it let users cache the
> > > intermediate
> > > > > > > > > > > > results(tables)
> > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > use them in the later jobs.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Even though the FLIP has been discussed in the
> > > > > past[1], the FLIP
> > > > > > > > > > > > hasn't
> > > > > > > > > > > > > > > formally passed the vote yet. And some of the
> > > design
> > > > > and
> > > > > > > > > > > > implementation
> > > > > > > > > > > > > > > detail have to change to incorporates the cluster
> > > > > partition
> > > > > > > > > > > proposed
> > > > > > > > > > > > in
> > > > > > > > > > > > > > > FLIP-67[2].
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Xuannan
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > >
> > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > > > > > > > > > > > > > [2]
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > >
> > > https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > > > > > > >
> > > > > > > > > > >
> > > > >
> > >
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Timo Walther-2
Hi Xuannan,

sorry for joining the discussion so late. I agree that this is a very
nice and useful feature. However, the impact it has to many components
in the stack requires more discussion in my opinion.

1) Separation of concerns:
The current design seems to mix different layers. We should make sure
that all layers do what they are supposed to do:

1a) The FLIP states: "The cache() method returns a new Table object with
a flag set."

The `Table` object is just a thin API class that wraps a
`QueryOperation`. Other than that the `Table` object should not contain
futher state. The tree of `QueryOperation` should be an immutable,
independent data structure that can be passed around and will eventually
be passed to the `Planner`.

The mentioned `CacheSink` should be added by the optimizer. It is not
the responsibility of the API do perform optimizer-like tasks. A call to
`t1.cache()` should simply wrap the original operation into something
like `CacheOperation(t1.getQueryOperation)`. A `CacheOperation` could
still extend from `QueryOperation` and assign a unique string identifier
already. A specialized `StreamTransformation` would be necessary during
translation.

1b) The FLIP states: "The default table service stores the metadata in
the client (e.g. TableEnvironment)"

`TableEnvironment` is not a client. Similar to `Table`, it is an API
class that just delegates to other session components. Currently, the
table environment has (again) to many responsibilities that should
better be split into other components. The table's `Executor` is the
client that performs the cluster communication. But in general it also
just delegates to `org.apache.flink.core.execution.PipelineExecutor`.
IMO the `PipelineExecutor` is a better fit for a back-and-forth
communication to determine existing cluster partitions and modify the
job graph. Or even further down the stack, because as far as I know,
`PipelineExecutor` works with `StreamGraph`.

`flink-table-api-java` has no dependency to `flink-runtime`. This has
been done on purpose.

2) API
I still see the rejected option 2 a good fit to expose this feature.

A `Table.cache(): CachedTable` with `CachedTable.invalidate(): void` and
maybe `CachedTable.getId(): String` makes the feature and its operations
very explicit. It also avoids following up questions such as:

Will `invalidateCache()` be transitively propagated in
`t1.cache().join(t2.cache()).invalidateCache()`?

Or as the FLIP states:

`Table t3 = t1.select(...) // cache will NOT be used.`
but
`t1.invalidateCache() // cache will be released`

This sounds a bit contradicting to me. Because sometimes the
`t1.cache()` has implications on t1 and sometimes not.

3) Big picture

After reading the FLIP, I still don't understand how a user can
configure or control the table service. Will we offer options through
`TableConfig` or `TableEnvironment` or is this configuration done via
ConfigOptions for lower layers?

4) SQL integration

As I mentioned earlier, we should think about a SQL integration as well.
Otherwise we need to redesign the Java API to align it with SQL later.
SQL has also a bigger user base than Table API. Let's assume we
introduce a new keyword and combine the caching with regular CREATE VIEW
syntax such as:
`CREATE CACHED TEMPORARY VIEW MyTable AS SELECT *`
This would align well with:
`tEnv.registerTemporaryView("MyTable", table.cache())`

What do you think?

4) TableEnvironment.close()

Will `TableEnvironment` implement `AutoCloseable`?


In summary, I think the FLIP should go into more details how this effort
affects each layer. Because a lot of the interfaces are `@Public` or
`@PublicEvolving`. And the FLIP still leaves a lot of questions how this
high level concept ends up in JobGraph.

Regards,
Timo



On 30.07.20 09:00, Xuannan Su wrote:

> Hi folks,
>
> It seems that all the raised concerns so far have been resolved. I plan to start a voting thread for FLIP-36 early next week if there are no comments.
>
> Thanks,
> Xuannan
> On Jul 28, 2020, 7:42 PM +0800, Xuannan Su <[hidden email]>, wrote:
>> Hi Kurt,
>>
>> Thanks for the comments.
>>
>> You are right that the FLIP lacks a proper discussion about the impact of the optimizer. I have added the section to talk about how the cache table works with the optimizer. I hope this could resolve your concern. Please let me know if you have any further comments.
>>
>> Best,
>> Xuannan
>> On Jul 22, 2020, 4:36 PM +0800, Kurt Young <[hidden email]>, wrote:
>>> Thanks for the reply, I have one more comment about the optimizer
>>> affection. Even if you are
>>> trying to make the cached table be as orthogonal to the optimizer as
>>> possible by introducing
>>> a special sink, it is still not clear why this approach is safe. Maybe you
>>> can add some process
>>> introduction from API to JobGraph, otherwise I can't make sure everyone
>>> reviewing the design
>>> doc will have the same imagination about this. And I'm also quite sure some
>>> of the existing
>>> mechanism will be affected by this special sink, e.g. multi sink
>>> optimization.
>>>
>>> Best,
>>> Kurt
>>>
>>>
>>> On Wed, Jul 22, 2020 at 2:31 PM Xuannan Su <[hidden email]> wrote:
>>>
>>>> Hi Kurt,
>>>>
>>>> Thanks for the comments.
>>>>
>>>> 1. How do you identify the CachedTable?
>>>> For the current design proposed in FLIP-36, we are using the first
>>>> approach you mentioned, where the key of the map is the Cached Table java
>>>> object. I think it is fine not to be able to identify another table
>>>> representing the same DAG and not using the cached intermediate result
>>>> because we want to make the caching table explicit. As mentioned in the
>>>> FLIP, the cache API will return a Table object. And the user has to use the
>>>> returned Table object to make use of the cached table. The rationale is
>>>> that if the user builds the same DAG from scratch with some
>>>> TableEnvironment instead of using the cached table object, the user
>>>> probably doesn't want to use the cache.
>>>>
>>>> 2. How does the CachedTable affect the optimizer?
>>>> We try to make the logic dealing with the cached table be as orthogonal to
>>>> the optimizer as possible. That's why we introduce a special sink when we
>>>> are going to cache a table and a special source when we are going to use a
>>>> cached table. This way, we can let the optimizer does it works, and the
>>>> logic of modifying the job graph can happen in the job graph generator. We
>>>> can recognize the cached node with the special sink and source.
>>>>
>>>> 3. What's the effect of calling TableEnvironment.close()?
>>>> We introduce the close method to prevent leaking of the cached table when
>>>> the user is done with the table environment. Therefore, it makes more sense
>>>> that the table environment, including all of its functionality, should not
>>>> be used after closing. Otherwise, we should rename the close method to
>>>> clearAllCache or something similar.
>>>>
>>>> And thanks for pointing out the use of not existing API used in the given
>>>> examples. I have updated the examples in the FLIP accordingly.
>>>>
>>>> Best,
>>>> Xuannan
>>>> On Jul 16, 2020, 4:15 PM +0800, Kurt Young <[hidden email]>, wrote:
>>>>> Hi Xuanna,
>>>>>
>>>>> Thanks for the detailed design doc, it described clearly how the API
>>>> looks
>>>>> and how to interact with Flink runtime.
>>>>> However, the part which relates to SQL's optimizer is kind of blurry. To
>>>> be
>>>>> more precise, I have following questions:
>>>>>
>>>>> 1. How do you identify the CachedTable? I can imagine there would be map
>>>>> representing the cache, how do you
>>>>> compare the keys of the map? One approach is they will be compared by
>>>> java
>>>>> objects, which is simple but has
>>>>> limited scope. For example, users created another table using some
>>>>> interfaces of TableEnvironment, and the table
>>>>> is exactly the same as the cached one, you won't be able to identify it.
>>>>> Another choice is calculating the "signature" or
>>>>> "diest" of the cached table, which involves string representation of the
>>>>> whole sub tree represented by the cached table.
>>>>> I don't think Flink currently provides such a mechanism around Table
>>>>> though.
>>>>>
>>>>> 2. How does the CachedTable affect the optimizer? Specifically, will you
>>>>> have a dedicated QueryOperation for it, will you have
>>>>> a dedicated logical & physical RelNode for it? And I also don't see a
>>>>> description about how to work with current optimize phases,
>>>>> from Operation to Calcite rel node, and then to Flink's logical and
>>>>> physical node, which will be at last translated to Flink's exec node.
>>>>> There also exists other optimizations such as dead lock breaker, as well
>>>> as
>>>>> sub plan reuse inside the optimizer, I'm not sure whether
>>>>> the logic dealing with cached tables can be orthogonal to all of these.
>>>>> Hence I expect you could have a more detailed description here.
>>>>>
>>>>> 3. What's the effect of calling TableEnvironment.close()? You already
>>>>> explained this would drop all caches this table env has,
>>>>> could you also explain where other functionality still works for this
>>>> table
>>>>> env? Like can use still create/drop tables/databases/function
>>>>> through this table env? What happens to the catalog and all temporary
>>>>> objects of this table env?
>>>>>
>>>>> One minor comment: I noticed you used some not existing API in the
>>>> examples
>>>>> you gave, like table.collect(), which is a little
>>>>> misleading.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <[hidden email]> wrote:
>>>>>
>>>>>> Hi folks,
>>>>>>
>>>>>> I'd like to revive the discussion about FLIP-36 Support Interactive
>>>>>> Programming in Flink Table API
>>>>>>
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>>>>>>
>>>>>> The FLIP proposes to add support for interactive programming in Flink
>>>>>> Table API. Specifically, it let users cache the intermediate
>>>>>> results(tables) and use them in the later jobs to avoid recomputing the
>>>>>> intermediate result(tables).
>>>>>>
>>>>>> I am looking forward to any opinions and suggestions from the
>>>> community.
>>>>>>
>>>>>> Best,
>>>>>> Xuannan
>>>>>> On May 7, 2020, 5:40 PM +0800, Xuannan Su <[hidden email]>,
>>>> wrote:
>>>>>>> Hi,
>>>>>>>
>>>>>>> There are some feedbacks from @Timo and @Kurt in the voting thread
>>>> for
>>>>>> FLIP-36 and I want to share my thoughts here.
>>>>>>>
>>>>>>> 1. How would the FLIP-36 look like after FLIP-84?
>>>>>>> I don't think FLIP-84 will affect FLIP-36 from the public API
>>>>>> perspective. Users can call .cache on a table object and the cached
>>>> table
>>>>>> will be generated whenever the table job is triggered to execute,
>>>> either by
>>>>>> Table#executeInsert or StatementSet#execute. I think that FLIP-36
>>>> should
>>>>>> aware of the changes made by FLIP-84, but it shouldn't be a problem.
>>>> At the
>>>>>> end of the day, FLIP-36 only requires the ability to add a sink to a
>>>> node,
>>>>>> submit a table job with multiple sinks, and replace the cached table
>>>> with a
>>>>>> source.
>>>>>>>
>>>>>>> 2. How can we support cache in a multi-statement SQL file?
>>>>>>> The most intuitive way to support cache in a multi-statement SQL
>>>> file is
>>>>>> by using a view, where the view is corresponding to a cached table.
>>>>>>>
>>>>>>> 3. Unifying the cached table and materialized views
>>>>>>> It is true that the cached table and the materialized view are
>>>> similar
>>>>>> in some way. However, I think the materialized view is a more complex
>>>>>> concept. First, a materialized view requires some kind of a refresh
>>>>>> mechanism to synchronize with the table. Secondly, the life cycle of a
>>>>>> materialized view is longer. The materialized view should be accessible
>>>>>> even after the application exits and should be accessible by another
>>>>>> application, while the cached table is only accessible in the
>>>> application
>>>>>> where it is created. The cached table is introduced to avoid
>>>> recomputation
>>>>>> of an intermediate table to support interactive programming in Flink
>>>> Table
>>>>>> API. And I think the materialized view needs more discussion and
>>>> certainly
>>>>>> deserves a whole new FLIP.
>>>>>>>
>>>>>>> Please let me know your thought.
>>>>>>>
>>>>>>> Best,
>>>>>>> Xuannan
>>>>>>>
>>>>>> On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[hidden email]>
>>>> wrote:
>>>>>>> Hi folks,
>>>>>>>
>>>>>>> The FLIP-36 is updated according to the discussion with Becket. In
>>>> the
>>>>>> meantime, any comments are very welcome.
>>>>>>>
>>>>>>> If there are no further comments, I would like to start the voting
>>>>>>> thread by tomorrow.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Xuannan
>>>>>>>
>>>>>>>
>>>>>>>> On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]>
>>>>>> wrote:
>>>>>>>>> Hi Becket,
>>>>>>>>>
>>>>>>>>> You are right. It makes sense to treat retry of job 2 as an
>>>> ordinary
>>>>>> job. And the config does introduce some unnecessary confusion. Thank
>>>> you
>>>>>> for you comment. I will update the FLIP.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Xuannan
>>>>>>>>>
>>>>>>>>>> On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <
>>>> [hidden email]>
>>>>>> wrote:
>>>>>>>>>>> Hi Xuannan,
>>>>>>>>>>>
>>>>>>>>>>> If user submits Job 1 and generated a cached intermediate
>>>>>> result. And later
>>>>>>>>>>> on, user submitted job 2 which should ideally use the
>>>>>> intermediate result.
>>>>>>>>>>> In that case, if job 2 failed due to missing the intermediate
>>>>>> result, Job 2
>>>>>>>>>>> should be retried with its full DAG. After that when Job 2
>>>> runs,
>>>>>> it will
>>>>>>>>>>> also re-generate the cache. However, once job 2 has fell
>>>> back to
>>>>>> the
>>>>>>>>>>> original DAG, should it just be treated as an ordinary job
>>>> that
>>>>>> follow the
>>>>>>>>>>> recovery strategy? Having a separate configuration seems a
>>>> little
>>>>>>>>>>> confusing. In another word, re-generating the cache is just a
>>>>>> byproduct of
>>>>>>>>>>> running the full DAG of job 2, but is not the main purpose.
>>>> It
>>>>>> is just like
>>>>>>>>>>> when job 1 runs to generate cache, it does not have a
>>>> separate
>>>>>> config of
>>>>>>>>>>> retry to make sure the cache is generated. If it fails, it
>>>> just
>>>>>> fail like
>>>>>>>>>>> an ordinary job.
>>>>>>>>>>>
>>>>>>>>>>> What do you think?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>
>>>>>>>>>>> On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
>>>>>> [hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>>
>>>>>>>>>>>> The intermediate result will indeed be automatically
>>>>>> re-generated by
>>>>>>>>>>>> resubmitting the original DAG. And that job could fail as
>>>>>> well. In that
>>>>>>>>>>>> case, we need to decide if we should resubmit the original
>>>> DAG
>>>>>> to
>>>>>>>>>>>> re-generate the intermediate result or give up and throw an
>>>>>> exception to
>>>>>>>>>>>> the user. And the config is to indicate how many resubmit
>>>>>> should happen
>>>>>>>>>>>> before giving up.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Xuannan
>>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Xuannan,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I am not entirely sure if I understand the cases you
>>>>>> mentioned. The
>>>>>>>>>>>> users
>>>>>>>>>>>>>> can use the cached table object returned by the
>>>> .cache()
>>>>>> method in
>>>>>>>>>>>> other
>>>>>>>>>>>>>> job and it should read the intermediate result. The
>>>>>> intermediate result
>>>>>>>>>>>>> can
>>>>>>>>>>>>>> gone in the following three cases: 1. the user
>>>> explicitly
>>>>>> call the
>>>>>>>>>>>>>> invalidateCache() method 2. the TableEnvironment is
>>>> closed
>>>>>> 3. failure
>>>>>>>>>>>>>> happens on the TM. When that happens, the intermeidate
>>>>>> result will not
>>>>>>>>>>>> be
>>>>>>>>>>>>>> available unless it is re-generated.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> What confused me was that why do we need to have a
>>>>>> *cache.retries.max
>>>>>>>>>>>>> *config?
>>>>>>>>>>>>> Shouldn't the missing intermediate result always be
>>>>>> automatically
>>>>>>>>>>>>> re-generated if it is gone?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
>>>>>> [hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the comments.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
>>>>>> [hidden email]>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Xuannan,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks for picking up the FLIP. It looks good to me
>>>>>> overall. Some
>>>>>>>>>>>> quick
>>>>>>>>>>>>>>> comments / questions below:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 1. Do we also need changes in the Java API?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Yes, the public interface of Table and TableEnvironment
>>>>>> should be made
>>>>>>>>>>>> in
>>>>>>>>>>>>>> the Java API.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> 2. What are the cases that users may want to retry
>>>>>> reading the
>>>>>>>>>>>>>> intermediate
>>>>>>>>>>>>>>> result? It seems that once the intermediate result
>>>> has
>>>>>> gone, it will
>>>>>>>>>>>>> not
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>> available later without being generated again, right?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I am not entirely sure if I understand the cases you
>>>>>> mentioned. The
>>>>>>>>>>>>> users
>>>>>>>>>>>>>> can use the cached table object returned by the
>>>> .cache()
>>>>>> method in
>>>>>>>>>>>> other
>>>>>>>>>>>>>> job and it should read the intermediate result. The
>>>>>> intermediate result
>>>>>>>>>>>>> can
>>>>>>>>>>>>>> gone in the following three cases: 1. the user
>>>> explicitly
>>>>>> call the
>>>>>>>>>>>>>> invalidateCache() method 2. the TableEnvironment is
>>>> closed
>>>>>> 3. failure
>>>>>>>>>>>>>> happens on the TM. When that happens, the intermeidate
>>>>>> result will not
>>>>>>>>>>>> be
>>>>>>>>>>>>>> available unless it is re-generated.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 3. In the "semantic of cache() method" section, the
>>>>>> description "The
>>>>>>>>>>>>>>> semantic of the *cache() *method is a little
>>>> different
>>>>>> depending on
>>>>>>>>>>>>>> whether
>>>>>>>>>>>>>>> auto caching is enabled or not." seems not explained.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This line is actually outdated and should be removed,
>>>> as
>>>>>> we are not
>>>>>>>>>>>>> adding
>>>>>>>>>>>>>> the auto caching functionality in this FLIP. Auto
>>>> caching
>>>>>> will be added
>>>>>>>>>>>>> in
>>>>>>>>>>>>>> the future, and the semantic of cache() when auto
>>>> caching
>>>>>> is enabled
>>>>>>>>>>>> will
>>>>>>>>>>>>>> be discussed in detail by a new FLIP. I will remove the
>>>>>> descriptor to
>>>>>>>>>>>>> avoid
>>>>>>>>>>>>>> further confusion.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
>>>>>> [hidden email]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi folks,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I'd like to start the discussion about FLIP-36
>>>> Support
>>>>>> Interactive
>>>>>>>>>>>>>>>> Programming in Flink Table API
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The FLIP proposes to add support for interactive
>>>>>> programming in
>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>> Table
>>>>>>>>>>>>>>>> API. Specifically, it let users cache the
>>>> intermediate
>>>>>>>>>>>>> results(tables)
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> use them in the later jobs.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Even though the FLIP has been discussed in the
>>>>>> past[1], the FLIP
>>>>>>>>>>>>> hasn't
>>>>>>>>>>>>>>>> formally passed the vote yet. And some of the
>>>> design
>>>>>> and
>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>> detail have to change to incorporates the cluster
>>>>>> partition
>>>>>>>>>>>> proposed
>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> FLIP-67[2].
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Looking forward to your feedback.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Xuannan
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>
>>>> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>
>>>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi Timo,

Thanks for your comments. After the offline discussion, I have updated the FLIP with the following change.

1. Update the end to end process
    a. The Table.cache method should only wrap the origin query operation with CacheOperation.
    b. The planner will add the CacheSink or replace subtree with CacheSource while translating the QueryOperation to rel node. The CacheSink and CacheSource are treated as regular sinks and sources while the planner optimizes and translates the rel nodes.
    c. The StreamGraphGenerator will recognize the CacheSink and CacheSource and generate the StreamGraph accordingly. When it sees the CacheSink, it will remove the CacheSink and set the cache flag in the StreamNode. When it sees the CacheSource, it will include the ClusterPartitionDescriptor in the StreamNode.
    d. The JobGraph generator will not modify the graph. It only set the result partition type of the intermediate result to BLOCKING_PERSISTENT if it sees a StreamNode with the cache flag set or passes the ClusterPartitionDescriptor from the StreamNode to the JobVertex if the StreamNode has the ClusterPartitionDescriptor.
    e. The ClusterPartitionDescriptor will be included in the JobResult and sent back to the client.
2. The metadata of the cached table will be stored in the CatalogManager instead of the TableEnvironment
3. The Table.cache method returns a CachedTable, which is a subclass of Table.
4. Add a paragraph to explain that the cache table will not work in Per-Job Mode cluster.
5. Add SQL integration and Cache Eviction in the future work section.

As of the @Public and @PublicEvolving interface, I think it should be covered in the public interface section, i.e., Table and TableEnvironment. Other than those, all the changes should only affect the internal class.

Please let me know if you have any further comments.

Best,
Xuannan
On Aug 10, 2020, 9:32 PM +0800, Timo Walther <[hidden email]>, wrote:

> Hi Xuannan,
>
> sorry for joining the discussion so late. I agree that this is a very
> nice and useful feature. However, the impact it has to many components
> in the stack requires more discussion in my opinion.
>
> 1) Separation of concerns:
> The current design seems to mix different layers. We should make sure
> that all layers do what they are supposed to do:
>
> 1a) The FLIP states: "The cache() method returns a new Table object with
> a flag set."
>
> The `Table` object is just a thin API class that wraps a
> `QueryOperation`. Other than that the `Table` object should not contain
> futher state. The tree of `QueryOperation` should be an immutable,
> independent data structure that can be passed around and will eventually
> be passed to the `Planner`.
>
> The mentioned `CacheSink` should be added by the optimizer. It is not
> the responsibility of the API do perform optimizer-like tasks. A call to
> `t1.cache()` should simply wrap the original operation into something
> like `CacheOperation(t1.getQueryOperation)`. A `CacheOperation` could
> still extend from `QueryOperation` and assign a unique string identifier
> already. A specialized `StreamTransformation` would be necessary during
> translation.
>
> 1b) The FLIP states: "The default table service stores the metadata in
> the client (e.g. TableEnvironment)"
>
> `TableEnvironment` is not a client. Similar to `Table`, it is an API
> class that just delegates to other session components. Currently, the
> table environment has (again) to many responsibilities that should
> better be split into other components. The table's `Executor` is the
> client that performs the cluster communication. But in general it also
> just delegates to `org.apache.flink.core.execution.PipelineExecutor`.
> IMO the `PipelineExecutor` is a better fit for a back-and-forth
> communication to determine existing cluster partitions and modify the
> job graph. Or even further down the stack, because as far as I know,
> `PipelineExecutor` works with `StreamGraph`.
>
> `flink-table-api-java` has no dependency to `flink-runtime`. This has
> been done on purpose.
>
> 2) API
> I still see the rejected option 2 a good fit to expose this feature.
>
> A `Table.cache(): CachedTable` with `CachedTable.invalidate(): void` and
> maybe `CachedTable.getId(): String` makes the feature and its operations
> very explicit. It also avoids following up questions such as:
>
> Will `invalidateCache()` be transitively propagated in
> `t1.cache().join(t2.cache()).invalidateCache()`?
>
> Or as the FLIP states:
>
> `Table t3 = t1.select(...) // cache will NOT be used.`
> but
> `t1.invalidateCache() // cache will be released`
>
> This sounds a bit contradicting to me. Because sometimes the
> `t1.cache()` has implications on t1 and sometimes not.
>
> 3) Big picture
>
> After reading the FLIP, I still don't understand how a user can
> configure or control the table service. Will we offer options through
> `TableConfig` or `TableEnvironment` or is this configuration done via
> ConfigOptions for lower layers?
>
> 4) SQL integration
>
> As I mentioned earlier, we should think about a SQL integration as well.
> Otherwise we need to redesign the Java API to align it with SQL later.
> SQL has also a bigger user base than Table API. Let's assume we
> introduce a new keyword and combine the caching with regular CREATE VIEW
> syntax such as:
> `CREATE CACHED TEMPORARY VIEW MyTable AS SELECT *`
> This would align well with:
> `tEnv.registerTemporaryView("MyTable", table.cache())`
>
> What do you think?
>
> 4) TableEnvironment.close()
>
> Will `TableEnvironment` implement `AutoCloseable`?
>
>
> In summary, I think the FLIP should go into more details how this effort
> affects each layer. Because a lot of the interfaces are `@Public` or
> `@PublicEvolving`. And the FLIP still leaves a lot of questions how this
> high level concept ends up in JobGraph.
>
> Regards,
> Timo
>
>
>
> On 30.07.20 09:00, Xuannan Su wrote:
> > Hi folks,
> >
> > It seems that all the raised concerns so far have been resolved. I plan to start a voting thread for FLIP-36 early next week if there are no comments.
> >
> > Thanks,
> > Xuannan
> > On Jul 28, 2020, 7:42 PM +0800, Xuannan Su <[hidden email]>, wrote:
> > > Hi Kurt,
> > >
> > > Thanks for the comments.
> > >
> > > You are right that the FLIP lacks a proper discussion about the impact of the optimizer. I have added the section to talk about how the cache table works with the optimizer. I hope this could resolve your concern. Please let me know if you have any further comments.
> > >
> > > Best,
> > > Xuannan
> > > On Jul 22, 2020, 4:36 PM +0800, Kurt Young <[hidden email]>, wrote:
> > > > Thanks for the reply, I have one more comment about the optimizer
> > > > affection. Even if you are
> > > > trying to make the cached table be as orthogonal to the optimizer as
> > > > possible by introducing
> > > > a special sink, it is still not clear why this approach is safe. Maybe you
> > > > can add some process
> > > > introduction from API to JobGraph, otherwise I can't make sure everyone
> > > > reviewing the design
> > > > doc will have the same imagination about this. And I'm also quite sure some
> > > > of the existing
> > > > mechanism will be affected by this special sink, e.g. multi sink
> > > > optimization.
> > > >
> > > > Best,
> > > > Kurt
> > > >
> > > >
> > > > On Wed, Jul 22, 2020 at 2:31 PM Xuannan Su <[hidden email]> wrote:
> > > >
> > > > > Hi Kurt,
> > > > >
> > > > > Thanks for the comments.
> > > > >
> > > > > 1. How do you identify the CachedTable?
> > > > > For the current design proposed in FLIP-36, we are using the first
> > > > > approach you mentioned, where the key of the map is the Cached Table java
> > > > > object. I think it is fine not to be able to identify another table
> > > > > representing the same DAG and not using the cached intermediate result
> > > > > because we want to make the caching table explicit. As mentioned in the
> > > > > FLIP, the cache API will return a Table object. And the user has to use the
> > > > > returned Table object to make use of the cached table. The rationale is
> > > > > that if the user builds the same DAG from scratch with some
> > > > > TableEnvironment instead of using the cached table object, the user
> > > > > probably doesn't want to use the cache.
> > > > >
> > > > > 2. How does the CachedTable affect the optimizer?
> > > > > We try to make the logic dealing with the cached table be as orthogonal to
> > > > > the optimizer as possible. That's why we introduce a special sink when we
> > > > > are going to cache a table and a special source when we are going to use a
> > > > > cached table. This way, we can let the optimizer does it works, and the
> > > > > logic of modifying the job graph can happen in the job graph generator. We
> > > > > can recognize the cached node with the special sink and source.
> > > > >
> > > > > 3. What's the effect of calling TableEnvironment.close()?
> > > > > We introduce the close method to prevent leaking of the cached table when
> > > > > the user is done with the table environment. Therefore, it makes more sense
> > > > > that the table environment, including all of its functionality, should not
> > > > > be used after closing. Otherwise, we should rename the close method to
> > > > > clearAllCache or something similar.
> > > > >
> > > > > And thanks for pointing out the use of not existing API used in the given
> > > > > examples. I have updated the examples in the FLIP accordingly.
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > > On Jul 16, 2020, 4:15 PM +0800, Kurt Young <[hidden email]>, wrote:
> > > > > > Hi Xuanna,
> > > > > >
> > > > > > Thanks for the detailed design doc, it described clearly how the API
> > > > > looks
> > > > > > and how to interact with Flink runtime.
> > > > > > However, the part which relates to SQL's optimizer is kind of blurry. To
> > > > > be
> > > > > > more precise, I have following questions:
> > > > > >
> > > > > > 1. How do you identify the CachedTable? I can imagine there would be map
> > > > > > representing the cache, how do you
> > > > > > compare the keys of the map? One approach is they will be compared by
> > > > > java
> > > > > > objects, which is simple but has
> > > > > > limited scope. For example, users created another table using some
> > > > > > interfaces of TableEnvironment, and the table
> > > > > > is exactly the same as the cached one, you won't be able to identify it.
> > > > > > Another choice is calculating the "signature" or
> > > > > > "diest" of the cached table, which involves string representation of the
> > > > > > whole sub tree represented by the cached table.
> > > > > > I don't think Flink currently provides such a mechanism around Table
> > > > > > though.
> > > > > >
> > > > > > 2. How does the CachedTable affect the optimizer? Specifically, will you
> > > > > > have a dedicated QueryOperation for it, will you have
> > > > > > a dedicated logical & physical RelNode for it? And I also don't see a
> > > > > > description about how to work with current optimize phases,
> > > > > > from Operation to Calcite rel node, and then to Flink's logical and
> > > > > > physical node, which will be at last translated to Flink's exec node.
> > > > > > There also exists other optimizations such as dead lock breaker, as well
> > > > > as
> > > > > > sub plan reuse inside the optimizer, I'm not sure whether
> > > > > > the logic dealing with cached tables can be orthogonal to all of these.
> > > > > > Hence I expect you could have a more detailed description here.
> > > > > >
> > > > > > 3. What's the effect of calling TableEnvironment.close()? You already
> > > > > > explained this would drop all caches this table env has,
> > > > > > could you also explain where other functionality still works for this
> > > > > table
> > > > > > env? Like can use still create/drop tables/databases/function
> > > > > > through this table env? What happens to the catalog and all temporary
> > > > > > objects of this table env?
> > > > > >
> > > > > > One minor comment: I noticed you used some not existing API in the
> > > > > examples
> > > > > > you gave, like table.collect(), which is a little
> > > > > > misleading.
> > > > > >
> > > > > > Best,
> > > > > > Kurt
> > > > > >
> > > > > >
> > > > > > On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <[hidden email]> wrote:
> > > > > >
> > > > > > > Hi folks,
> > > > > > >
> > > > > > > I'd like to revive the discussion about FLIP-36 Support Interactive
> > > > > > > Programming in Flink Table API
> > > > > > >
> > > > > > >
> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > >
> > > > > > > The FLIP proposes to add support for interactive programming in Flink
> > > > > > > Table API. Specifically, it let users cache the intermediate
> > > > > > > results(tables) and use them in the later jobs to avoid recomputing the
> > > > > > > intermediate result(tables).
> > > > > > >
> > > > > > > I am looking forward to any opinions and suggestions from the
> > > > > community.
> > > > > > >
> > > > > > > Best,
> > > > > > > Xuannan
> > > > > > > On May 7, 2020, 5:40 PM +0800, Xuannan Su <[hidden email]>,
> > > > > wrote:
> > > > > > > > Hi,
> > > > > > > >
> > > > > > > > There are some feedbacks from @Timo and @Kurt in the voting thread
> > > > > for
> > > > > > > FLIP-36 and I want to share my thoughts here.
> > > > > > > >
> > > > > > > > 1. How would the FLIP-36 look like after FLIP-84?
> > > > > > > > I don't think FLIP-84 will affect FLIP-36 from the public API
> > > > > > > perspective. Users can call .cache on a table object and the cached
> > > > > table
> > > > > > > will be generated whenever the table job is triggered to execute,
> > > > > either by
> > > > > > > Table#executeInsert or StatementSet#execute. I think that FLIP-36
> > > > > should
> > > > > > > aware of the changes made by FLIP-84, but it shouldn't be a problem.
> > > > > At the
> > > > > > > end of the day, FLIP-36 only requires the ability to add a sink to a
> > > > > node,
> > > > > > > submit a table job with multiple sinks, and replace the cached table
> > > > > with a
> > > > > > > source.
> > > > > > > >
> > > > > > > > 2. How can we support cache in a multi-statement SQL file?
> > > > > > > > The most intuitive way to support cache in a multi-statement SQL
> > > > > file is
> > > > > > > by using a view, where the view is corresponding to a cached table.
> > > > > > > >
> > > > > > > > 3. Unifying the cached table and materialized views
> > > > > > > > It is true that the cached table and the materialized view are
> > > > > similar
> > > > > > > in some way. However, I think the materialized view is a more complex
> > > > > > > concept. First, a materialized view requires some kind of a refresh
> > > > > > > mechanism to synchronize with the table. Secondly, the life cycle of a
> > > > > > > materialized view is longer. The materialized view should be accessible
> > > > > > > even after the application exits and should be accessible by another
> > > > > > > application, while the cached table is only accessible in the
> > > > > application
> > > > > > > where it is created. The cached table is introduced to avoid
> > > > > recomputation
> > > > > > > of an intermediate table to support interactive programming in Flink
> > > > > Table
> > > > > > > API. And I think the materialized view needs more discussion and
> > > > > certainly
> > > > > > > deserves a whole new FLIP.
> > > > > > > >
> > > > > > > > Please let me know your thought.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Xuannan
> > > > > > > >
> > > > > > > On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[hidden email]>
> > > > > wrote:
> > > > > > > > Hi folks,
> > > > > > > >
> > > > > > > > The FLIP-36 is updated according to the discussion with Becket. In
> > > > > the
> > > > > > > meantime, any comments are very welcome.
> > > > > > > >
> > > > > > > > If there are no further comments, I would like to start the voting
> > > > > > > > thread by tomorrow.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Xuannan
> > > > > > > >
> > > > > > > >
> > > > > > > > > On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]>
> > > > > > > wrote:
> > > > > > > > > > Hi Becket,
> > > > > > > > > >
> > > > > > > > > > You are right. It makes sense to treat retry of job 2 as an
> > > > > ordinary
> > > > > > > job. And the config does introduce some unnecessary confusion. Thank
> > > > > you
> > > > > > > for you comment. I will update the FLIP.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Xuannan
> > > > > > > > > >
> > > > > > > > > > > On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <
> > > > > [hidden email]>
> > > > > > > wrote:
> > > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > > >
> > > > > > > > > > > > If user submits Job 1 and generated a cached intermediate
> > > > > > > result. And later
> > > > > > > > > > > > on, user submitted job 2 which should ideally use the
> > > > > > > intermediate result.
> > > > > > > > > > > > In that case, if job 2 failed due to missing the intermediate
> > > > > > > result, Job 2
> > > > > > > > > > > > should be retried with its full DAG. After that when Job 2
> > > > > runs,
> > > > > > > it will
> > > > > > > > > > > > also re-generate the cache. However, once job 2 has fell
> > > > > back to
> > > > > > > the
> > > > > > > > > > > > original DAG, should it just be treated as an ordinary job
> > > > > that
> > > > > > > follow the
> > > > > > > > > > > > recovery strategy? Having a separate configuration seems a
> > > > > little
> > > > > > > > > > > > confusing. In another word, re-generating the cache is just a
> > > > > > > byproduct of
> > > > > > > > > > > > running the full DAG of job 2, but is not the main purpose.
> > > > > It
> > > > > > > is just like
> > > > > > > > > > > > when job 1 runs to generate cache, it does not have a
> > > > > separate
> > > > > > > config of
> > > > > > > > > > > > retry to make sure the cache is generated. If it fails, it
> > > > > just
> > > > > > > fail like
> > > > > > > > > > > > an ordinary job.
> > > > > > > > > > > >
> > > > > > > > > > > > What do you think?
> > > > > > > > > > > >
> > > > > > > > > > > > Thanks,
> > > > > > > > > > > >
> > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > >
> > > > > > > > > > > > On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
> > > > > > > [hidden email]> wrote:
> > > > > > > > > > > >
> > > > > > > > > > > > > Hi Becket,
> > > > > > > > > > > > >
> > > > > > > > > > > > > The intermediate result will indeed be automatically
> > > > > > > re-generated by
> > > > > > > > > > > > > resubmitting the original DAG. And that job could fail as
> > > > > > > well. In that
> > > > > > > > > > > > > case, we need to decide if we should resubmit the original
> > > > > DAG
> > > > > > > to
> > > > > > > > > > > > > re-generate the intermediate result or give up and throw an
> > > > > > > exception to
> > > > > > > > > > > > > the user. And the config is to indicate how many resubmit
> > > > > > > should happen
> > > > > > > > > > > > > before giving up.
> > > > > > > > > > > > >
> > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > Xuannan
> > > > > > > > > > > > >
> > > > > > > > > > > > > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
> > > > > > > [hidden email]> wrote:
> > > > > > > > > > > > >
> > > > > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > I am not entirely sure if I understand the cases you
> > > > > > > mentioned. The
> > > > > > > > > > > > > users
> > > > > > > > > > > > > > > can use the cached table object returned by the
> > > > > .cache()
> > > > > > > method in
> > > > > > > > > > > > > other
> > > > > > > > > > > > > > > job and it should read the intermediate result. The
> > > > > > > intermediate result
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > gone in the following three cases: 1. the user
> > > > > explicitly
> > > > > > > call the
> > > > > > > > > > > > > > > invalidateCache() method 2. the TableEnvironment is
> > > > > closed
> > > > > > > 3. failure
> > > > > > > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > > > > > > result will not
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > > available unless it is re-generated.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > What confused me was that why do we need to have a
> > > > > > > *cache.retries.max
> > > > > > > > > > > > > > *config?
> > > > > > > > > > > > > > Shouldn't the missing intermediate result always be
> > > > > > > automatically
> > > > > > > > > > > > > > re-generated if it is gone?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
> > > > > > > [hidden email]>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Becket,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks for the comments.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
> > > > > > > [hidden email]>
> > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks for picking up the FLIP. It looks good to me
> > > > > > > overall. Some
> > > > > > > > > > > > > quick
> > > > > > > > > > > > > > > > comments / questions below:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 1. Do we also need changes in the Java API?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Yes, the public interface of Table and TableEnvironment
> > > > > > > should be made
> > > > > > > > > > > > > in
> > > > > > > > > > > > > > > the Java API.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > 2. What are the cases that users may want to retry
> > > > > > > reading the
> > > > > > > > > > > > > > > intermediate
> > > > > > > > > > > > > > > > result? It seems that once the intermediate result
> > > > > has
> > > > > > > gone, it will
> > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > available later without being generated again, right?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > I am not entirely sure if I understand the cases you
> > > > > > > mentioned. The
> > > > > > > > > > > > > > users
> > > > > > > > > > > > > > > can use the cached table object returned by the
> > > > > .cache()
> > > > > > > method in
> > > > > > > > > > > > > other
> > > > > > > > > > > > > > > job and it should read the intermediate result. The
> > > > > > > intermediate result
> > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > gone in the following three cases: 1. the user
> > > > > explicitly
> > > > > > > call the
> > > > > > > > > > > > > > > invalidateCache() method 2. the TableEnvironment is
> > > > > closed
> > > > > > > 3. failure
> > > > > > > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > > > > > > result will not
> > > > > > > > > > > > > be
> > > > > > > > > > > > > > > available unless it is re-generated.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > 3. In the "semantic of cache() method" section, the
> > > > > > > description "The
> > > > > > > > > > > > > > > > semantic of the *cache() *method is a little
> > > > > different
> > > > > > > depending on
> > > > > > > > > > > > > > > whether
> > > > > > > > > > > > > > > > auto caching is enabled or not." seems not explained.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > This line is actually outdated and should be removed,
> > > > > as
> > > > > > > we are not
> > > > > > > > > > > > > > adding
> > > > > > > > > > > > > > > the auto caching functionality in this FLIP. Auto
> > > > > caching
> > > > > > > will be added
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > the future, and the semantic of cache() when auto
> > > > > caching
> > > > > > > is enabled
> > > > > > > > > > > > > will
> > > > > > > > > > > > > > > be discussed in detail by a new FLIP. I will remove the
> > > > > > > descriptor to
> > > > > > > > > > > > > > avoid
> > > > > > > > > > > > > > > further confusion.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
> > > > > > > [hidden email]>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi folks,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I'd like to start the discussion about FLIP-36
> > > > > Support
> > > > > > > Interactive
> > > > > > > > > > > > > > > > > Programming in Flink Table API
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > >
> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > The FLIP proposes to add support for interactive
> > > > > > > programming in
> > > > > > > > > > > > > Flink
> > > > > > > > > > > > > > > > Table
> > > > > > > > > > > > > > > > > API. Specifically, it let users cache the
> > > > > intermediate
> > > > > > > > > > > > > > results(tables)
> > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > use them in the later jobs.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Even though the FLIP has been discussed in the
> > > > > > > past[1], the FLIP
> > > > > > > > > > > > > > hasn't
> > > > > > > > > > > > > > > > > formally passed the vote yet. And some of the
> > > > > design
> > > > > > > and
> > > > > > > > > > > > > > implementation
> > > > > > > > > > > > > > > > > detail have to change to incorporates the cluster
> > > > > > > partition
> > > > > > > > > > > > > proposed
> > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > FLIP-67[2].
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > Xuannan
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > >
> > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > > > > > > > > > > > > > > > [2]
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > >
> > > > > https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > >
> > > > > > > > > > > > >
> > > > > > >
> > > > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Timo Walther-2
Hi Xuannan,

thanks for the update. Here is some final feedback from my side. Maybe
others have some final feedback as well before we continue to a voting?

Some mistakes that we should fix in the FLIP:
- The FLIP declares `Table cache();` but I guess this should be
`CachedTable cache();` now.
- The FLIP mentions `JobGraphGenerator` a couple of times but I guess we
are talking about the StreamingJobGraphGenerator.

I know that the FLIP is already quite large, but it would be good to get
a bit more information about:

1. How does `ClusterPartitionDescriptor` look like?

2. Will we move `IntermediateDataSetID` and `ClusterPartitionDescriptor`
to flink-core?

3. Will `TableEnvironment` implement `AutoClosable`?

4. "The TableEnvironment will fell back and resubmit the original DAG
without using the cache."

How do you imagine that? I guess the user needs to re-trigger the
execution, right?

5. "tables is submitted to in Per-Job mode, the returned
ClusterPartitionDescriptors will be ignored so that the planner will not
attempt to replace the subtree"

How do you imagine that? Where do you distinguish between per-job and
session mode?

6. You mentioned in you last mail "SQL integration in the future work
section."

I couldn't find this in the FLIP. Is the online FLIP the newest version?

7. Could you add an Implementation Plan section? It should explain how
this topic is split into the subtasks and potential PRs. Ideally, those
subtasks are split by layer to be reviewed by the individual component
teams.

Thanks,
Timo



On 25.08.20 12:45, Xuannan Su wrote:

> Hi Timo,
>
> Thanks for your comments. After the offline discussion, I have updated the FLIP with the following change.
>
> 1. Update the end to end process
>      a. The Table.cache method should only wrap the origin query operation with CacheOperation.
>      b. The planner will add the CacheSink or replace subtree with CacheSource while translating the QueryOperation to rel node. The CacheSink and CacheSource are treated as regular sinks and sources while the planner optimizes and translates the rel nodes.
>      c. The StreamGraphGenerator will recognize the CacheSink and CacheSource and generate the StreamGraph accordingly. When it sees the CacheSink, it will remove the CacheSink and set the cache flag in the StreamNode. When it sees the CacheSource, it will include the ClusterPartitionDescriptor in the StreamNode.
>      d. The JobGraph generator will not modify the graph. It only set the result partition type of the intermediate result to BLOCKING_PERSISTENT if it sees a StreamNode with the cache flag set or passes the ClusterPartitionDescriptor from the StreamNode to the JobVertex if the StreamNode has the ClusterPartitionDescriptor.
>      e. The ClusterPartitionDescriptor will be included in the JobResult and sent back to the client.
> 2. The metadata of the cached table will be stored in the CatalogManager instead of the TableEnvironment
> 3. The Table.cache method returns a CachedTable, which is a subclass of Table.
> 4. Add a paragraph to explain that the cache table will not work in Per-Job Mode cluster.
> 5. Add SQL integration and Cache Eviction in the future work section.
>
> As of the @Public and @PublicEvolving interface, I think it should be covered in the public interface section, i.e., Table and TableEnvironment. Other than those, all the changes should only affect the internal class.
>
> Please let me know if you have any further comments.
>
> Best,
> Xuannan
> On Aug 10, 2020, 9:32 PM +0800, Timo Walther <[hidden email]>, wrote:
>> Hi Xuannan,
>>
>> sorry for joining the discussion so late. I agree that this is a very
>> nice and useful feature. However, the impact it has to many components
>> in the stack requires more discussion in my opinion.
>>
>> 1) Separation of concerns:
>> The current design seems to mix different layers. We should make sure
>> that all layers do what they are supposed to do:
>>
>> 1a) The FLIP states: "The cache() method returns a new Table object with
>> a flag set."
>>
>> The `Table` object is just a thin API class that wraps a
>> `QueryOperation`. Other than that the `Table` object should not contain
>> futher state. The tree of `QueryOperation` should be an immutable,
>> independent data structure that can be passed around and will eventually
>> be passed to the `Planner`.
>>
>> The mentioned `CacheSink` should be added by the optimizer. It is not
>> the responsibility of the API do perform optimizer-like tasks. A call to
>> `t1.cache()` should simply wrap the original operation into something
>> like `CacheOperation(t1.getQueryOperation)`. A `CacheOperation` could
>> still extend from `QueryOperation` and assign a unique string identifier
>> already. A specialized `StreamTransformation` would be necessary during
>> translation.
>>
>> 1b) The FLIP states: "The default table service stores the metadata in
>> the client (e.g. TableEnvironment)"
>>
>> `TableEnvironment` is not a client. Similar to `Table`, it is an API
>> class that just delegates to other session components. Currently, the
>> table environment has (again) to many responsibilities that should
>> better be split into other components. The table's `Executor` is the
>> client that performs the cluster communication. But in general it also
>> just delegates to `org.apache.flink.core.execution.PipelineExecutor`.
>> IMO the `PipelineExecutor` is a better fit for a back-and-forth
>> communication to determine existing cluster partitions and modify the
>> job graph. Or even further down the stack, because as far as I know,
>> `PipelineExecutor` works with `StreamGraph`.
>>
>> `flink-table-api-java` has no dependency to `flink-runtime`. This has
>> been done on purpose.
>>
>> 2) API
>> I still see the rejected option 2 a good fit to expose this feature.
>>
>> A `Table.cache(): CachedTable` with `CachedTable.invalidate(): void` and
>> maybe `CachedTable.getId(): String` makes the feature and its operations
>> very explicit. It also avoids following up questions such as:
>>
>> Will `invalidateCache()` be transitively propagated in
>> `t1.cache().join(t2.cache()).invalidateCache()`?
>>
>> Or as the FLIP states:
>>
>> `Table t3 = t1.select(...) // cache will NOT be used.`
>> but
>> `t1.invalidateCache() // cache will be released`
>>
>> This sounds a bit contradicting to me. Because sometimes the
>> `t1.cache()` has implications on t1 and sometimes not.
>>
>> 3) Big picture
>>
>> After reading the FLIP, I still don't understand how a user can
>> configure or control the table service. Will we offer options through
>> `TableConfig` or `TableEnvironment` or is this configuration done via
>> ConfigOptions for lower layers?
>>
>> 4) SQL integration
>>
>> As I mentioned earlier, we should think about a SQL integration as well.
>> Otherwise we need to redesign the Java API to align it with SQL later.
>> SQL has also a bigger user base than Table API. Let's assume we
>> introduce a new keyword and combine the caching with regular CREATE VIEW
>> syntax such as:
>> `CREATE CACHED TEMPORARY VIEW MyTable AS SELECT *`
>> This would align well with:
>> `tEnv.registerTemporaryView("MyTable", table.cache())`
>>
>> What do you think?
>>
>> 4) TableEnvironment.close()
>>
>> Will `TableEnvironment` implement `AutoCloseable`?
>>
>>
>> In summary, I think the FLIP should go into more details how this effort
>> affects each layer. Because a lot of the interfaces are `@Public` or
>> `@PublicEvolving`. And the FLIP still leaves a lot of questions how this
>> high level concept ends up in JobGraph.
>>
>> Regards,
>> Timo
>>
>>
>>
>> On 30.07.20 09:00, Xuannan Su wrote:
>>> Hi folks,
>>>
>>> It seems that all the raised concerns so far have been resolved. I plan to start a voting thread for FLIP-36 early next week if there are no comments.
>>>
>>> Thanks,
>>> Xuannan
>>> On Jul 28, 2020, 7:42 PM +0800, Xuannan Su <[hidden email]>, wrote:
>>>> Hi Kurt,
>>>>
>>>> Thanks for the comments.
>>>>
>>>> You are right that the FLIP lacks a proper discussion about the impact of the optimizer. I have added the section to talk about how the cache table works with the optimizer. I hope this could resolve your concern. Please let me know if you have any further comments.
>>>>
>>>> Best,
>>>> Xuannan
>>>> On Jul 22, 2020, 4:36 PM +0800, Kurt Young <[hidden email]>, wrote:
>>>>> Thanks for the reply, I have one more comment about the optimizer
>>>>> affection. Even if you are
>>>>> trying to make the cached table be as orthogonal to the optimizer as
>>>>> possible by introducing
>>>>> a special sink, it is still not clear why this approach is safe. Maybe you
>>>>> can add some process
>>>>> introduction from API to JobGraph, otherwise I can't make sure everyone
>>>>> reviewing the design
>>>>> doc will have the same imagination about this. And I'm also quite sure some
>>>>> of the existing
>>>>> mechanism will be affected by this special sink, e.g. multi sink
>>>>> optimization.
>>>>>
>>>>> Best,
>>>>> Kurt
>>>>>
>>>>>
>>>>> On Wed, Jul 22, 2020 at 2:31 PM Xuannan Su <[hidden email]> wrote:
>>>>>
>>>>>> Hi Kurt,
>>>>>>
>>>>>> Thanks for the comments.
>>>>>>
>>>>>> 1. How do you identify the CachedTable?
>>>>>> For the current design proposed in FLIP-36, we are using the first
>>>>>> approach you mentioned, where the key of the map is the Cached Table java
>>>>>> object. I think it is fine not to be able to identify another table
>>>>>> representing the same DAG and not using the cached intermediate result
>>>>>> because we want to make the caching table explicit. As mentioned in the
>>>>>> FLIP, the cache API will return a Table object. And the user has to use the
>>>>>> returned Table object to make use of the cached table. The rationale is
>>>>>> that if the user builds the same DAG from scratch with some
>>>>>> TableEnvironment instead of using the cached table object, the user
>>>>>> probably doesn't want to use the cache.
>>>>>>
>>>>>> 2. How does the CachedTable affect the optimizer?
>>>>>> We try to make the logic dealing with the cached table be as orthogonal to
>>>>>> the optimizer as possible. That's why we introduce a special sink when we
>>>>>> are going to cache a table and a special source when we are going to use a
>>>>>> cached table. This way, we can let the optimizer does it works, and the
>>>>>> logic of modifying the job graph can happen in the job graph generator. We
>>>>>> can recognize the cached node with the special sink and source.
>>>>>>
>>>>>> 3. What's the effect of calling TableEnvironment.close()?
>>>>>> We introduce the close method to prevent leaking of the cached table when
>>>>>> the user is done with the table environment. Therefore, it makes more sense
>>>>>> that the table environment, including all of its functionality, should not
>>>>>> be used after closing. Otherwise, we should rename the close method to
>>>>>> clearAllCache or something similar.
>>>>>>
>>>>>> And thanks for pointing out the use of not existing API used in the given
>>>>>> examples. I have updated the examples in the FLIP accordingly.
>>>>>>
>>>>>> Best,
>>>>>> Xuannan
>>>>>> On Jul 16, 2020, 4:15 PM +0800, Kurt Young <[hidden email]>, wrote:
>>>>>>> Hi Xuanna,
>>>>>>>
>>>>>>> Thanks for the detailed design doc, it described clearly how the API
>>>>>> looks
>>>>>>> and how to interact with Flink runtime.
>>>>>>> However, the part which relates to SQL's optimizer is kind of blurry. To
>>>>>> be
>>>>>>> more precise, I have following questions:
>>>>>>>
>>>>>>> 1. How do you identify the CachedTable? I can imagine there would be map
>>>>>>> representing the cache, how do you
>>>>>>> compare the keys of the map? One approach is they will be compared by
>>>>>> java
>>>>>>> objects, which is simple but has
>>>>>>> limited scope. For example, users created another table using some
>>>>>>> interfaces of TableEnvironment, and the table
>>>>>>> is exactly the same as the cached one, you won't be able to identify it.
>>>>>>> Another choice is calculating the "signature" or
>>>>>>> "diest" of the cached table, which involves string representation of the
>>>>>>> whole sub tree represented by the cached table.
>>>>>>> I don't think Flink currently provides such a mechanism around Table
>>>>>>> though.
>>>>>>>
>>>>>>> 2. How does the CachedTable affect the optimizer? Specifically, will you
>>>>>>> have a dedicated QueryOperation for it, will you have
>>>>>>> a dedicated logical & physical RelNode for it? And I also don't see a
>>>>>>> description about how to work with current optimize phases,
>>>>>>> from Operation to Calcite rel node, and then to Flink's logical and
>>>>>>> physical node, which will be at last translated to Flink's exec node.
>>>>>>> There also exists other optimizations such as dead lock breaker, as well
>>>>>> as
>>>>>>> sub plan reuse inside the optimizer, I'm not sure whether
>>>>>>> the logic dealing with cached tables can be orthogonal to all of these.
>>>>>>> Hence I expect you could have a more detailed description here.
>>>>>>>
>>>>>>> 3. What's the effect of calling TableEnvironment.close()? You already
>>>>>>> explained this would drop all caches this table env has,
>>>>>>> could you also explain where other functionality still works for this
>>>>>> table
>>>>>>> env? Like can use still create/drop tables/databases/function
>>>>>>> through this table env? What happens to the catalog and all temporary
>>>>>>> objects of this table env?
>>>>>>>
>>>>>>> One minor comment: I noticed you used some not existing API in the
>>>>>> examples
>>>>>>> you gave, like table.collect(), which is a little
>>>>>>> misleading.
>>>>>>>
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <[hidden email]> wrote:
>>>>>>>
>>>>>>>> Hi folks,
>>>>>>>>
>>>>>>>> I'd like to revive the discussion about FLIP-36 Support Interactive
>>>>>>>> Programming in Flink Table API
>>>>>>>>
>>>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>>>>>>>>
>>>>>>>> The FLIP proposes to add support for interactive programming in Flink
>>>>>>>> Table API. Specifically, it let users cache the intermediate
>>>>>>>> results(tables) and use them in the later jobs to avoid recomputing the
>>>>>>>> intermediate result(tables).
>>>>>>>>
>>>>>>>> I am looking forward to any opinions and suggestions from the
>>>>>> community.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Xuannan
>>>>>>>> On May 7, 2020, 5:40 PM +0800, Xuannan Su <[hidden email]>,
>>>>>> wrote:
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> There are some feedbacks from @Timo and @Kurt in the voting thread
>>>>>> for
>>>>>>>> FLIP-36 and I want to share my thoughts here.
>>>>>>>>>
>>>>>>>>> 1. How would the FLIP-36 look like after FLIP-84?
>>>>>>>>> I don't think FLIP-84 will affect FLIP-36 from the public API
>>>>>>>> perspective. Users can call .cache on a table object and the cached
>>>>>> table
>>>>>>>> will be generated whenever the table job is triggered to execute,
>>>>>> either by
>>>>>>>> Table#executeInsert or StatementSet#execute. I think that FLIP-36
>>>>>> should
>>>>>>>> aware of the changes made by FLIP-84, but it shouldn't be a problem.
>>>>>> At the
>>>>>>>> end of the day, FLIP-36 only requires the ability to add a sink to a
>>>>>> node,
>>>>>>>> submit a table job with multiple sinks, and replace the cached table
>>>>>> with a
>>>>>>>> source.
>>>>>>>>>
>>>>>>>>> 2. How can we support cache in a multi-statement SQL file?
>>>>>>>>> The most intuitive way to support cache in a multi-statement SQL
>>>>>> file is
>>>>>>>> by using a view, where the view is corresponding to a cached table.
>>>>>>>>>
>>>>>>>>> 3. Unifying the cached table and materialized views
>>>>>>>>> It is true that the cached table and the materialized view are
>>>>>> similar
>>>>>>>> in some way. However, I think the materialized view is a more complex
>>>>>>>> concept. First, a materialized view requires some kind of a refresh
>>>>>>>> mechanism to synchronize with the table. Secondly, the life cycle of a
>>>>>>>> materialized view is longer. The materialized view should be accessible
>>>>>>>> even after the application exits and should be accessible by another
>>>>>>>> application, while the cached table is only accessible in the
>>>>>> application
>>>>>>>> where it is created. The cached table is introduced to avoid
>>>>>> recomputation
>>>>>>>> of an intermediate table to support interactive programming in Flink
>>>>>> Table
>>>>>>>> API. And I think the materialized view needs more discussion and
>>>>>> certainly
>>>>>>>> deserves a whole new FLIP.
>>>>>>>>>
>>>>>>>>> Please let me know your thought.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Xuannan
>>>>>>>>>
>>>>>>>> On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[hidden email]>
>>>>>> wrote:
>>>>>>>>> Hi folks,
>>>>>>>>>
>>>>>>>>> The FLIP-36 is updated according to the discussion with Becket. In
>>>>>> the
>>>>>>>> meantime, any comments are very welcome.
>>>>>>>>>
>>>>>>>>> If there are no further comments, I would like to start the voting
>>>>>>>>> thread by tomorrow.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Xuannan
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]>
>>>>>>>> wrote:
>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>
>>>>>>>>>>> You are right. It makes sense to treat retry of job 2 as an
>>>>>> ordinary
>>>>>>>> job. And the config does introduce some unnecessary confusion. Thank
>>>>>> you
>>>>>>>> for you comment. I will update the FLIP.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Xuannan
>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <
>>>>>> [hidden email]>
>>>>>>>> wrote:
>>>>>>>>>>>>> Hi Xuannan,
>>>>>>>>>>>>>
>>>>>>>>>>>>> If user submits Job 1 and generated a cached intermediate
>>>>>>>> result. And later
>>>>>>>>>>>>> on, user submitted job 2 which should ideally use the
>>>>>>>> intermediate result.
>>>>>>>>>>>>> In that case, if job 2 failed due to missing the intermediate
>>>>>>>> result, Job 2
>>>>>>>>>>>>> should be retried with its full DAG. After that when Job 2
>>>>>> runs,
>>>>>>>> it will
>>>>>>>>>>>>> also re-generate the cache. However, once job 2 has fell
>>>>>> back to
>>>>>>>> the
>>>>>>>>>>>>> original DAG, should it just be treated as an ordinary job
>>>>>> that
>>>>>>>> follow the
>>>>>>>>>>>>> recovery strategy? Having a separate configuration seems a
>>>>>> little
>>>>>>>>>>>>> confusing. In another word, re-generating the cache is just a
>>>>>>>> byproduct of
>>>>>>>>>>>>> running the full DAG of job 2, but is not the main purpose.
>>>>>> It
>>>>>>>> is just like
>>>>>>>>>>>>> when job 1 runs to generate cache, it does not have a
>>>>>> separate
>>>>>>>> config of
>>>>>>>>>>>>> retry to make sure the cache is generated. If it fails, it
>>>>>> just
>>>>>>>> fail like
>>>>>>>>>>>>> an ordinary job.
>>>>>>>>>>>>>
>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> The intermediate result will indeed be automatically
>>>>>>>> re-generated by
>>>>>>>>>>>>>> resubmitting the original DAG. And that job could fail as
>>>>>>>> well. In that
>>>>>>>>>>>>>> case, we need to decide if we should resubmit the original
>>>>>> DAG
>>>>>>>> to
>>>>>>>>>>>>>> re-generate the intermediate result or give up and throw an
>>>>>>>> exception to
>>>>>>>>>>>>>> the user. And the config is to indicate how many resubmit
>>>>>>>> should happen
>>>>>>>>>>>>>> before giving up.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Xuannan
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Xuannan,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I am not entirely sure if I understand the cases you
>>>>>>>> mentioned. The
>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>> can use the cached table object returned by the
>>>>>> .cache()
>>>>>>>> method in
>>>>>>>>>>>>>> other
>>>>>>>>>>>>>>>> job and it should read the intermediate result. The
>>>>>>>> intermediate result
>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>> gone in the following three cases: 1. the user
>>>>>> explicitly
>>>>>>>> call the
>>>>>>>>>>>>>>>> invalidateCache() method 2. the TableEnvironment is
>>>>>> closed
>>>>>>>> 3. failure
>>>>>>>>>>>>>>>> happens on the TM. When that happens, the intermeidate
>>>>>>>> result will not
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> available unless it is re-generated.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What confused me was that why do we need to have a
>>>>>>>> *cache.retries.max
>>>>>>>>>>>>>>> *config?
>>>>>>>>>>>>>>> Shouldn't the missing intermediate result always be
>>>>>>>> automatically
>>>>>>>>>>>>>>> re-generated if it is gone?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
>>>>>>>> [hidden email]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks for the comments.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
>>>>>>>> [hidden email]>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Xuannan,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks for picking up the FLIP. It looks good to me
>>>>>>>> overall. Some
>>>>>>>>>>>>>> quick
>>>>>>>>>>>>>>>>> comments / questions below:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 1. Do we also need changes in the Java API?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Yes, the public interface of Table and TableEnvironment
>>>>>>>> should be made
>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> the Java API.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2. What are the cases that users may want to retry
>>>>>>>> reading the
>>>>>>>>>>>>>>>> intermediate
>>>>>>>>>>>>>>>>> result? It seems that once the intermediate result
>>>>>> has
>>>>>>>> gone, it will
>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>> available later without being generated again, right?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> I am not entirely sure if I understand the cases you
>>>>>>>> mentioned. The
>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>> can use the cached table object returned by the
>>>>>> .cache()
>>>>>>>> method in
>>>>>>>>>>>>>> other
>>>>>>>>>>>>>>>> job and it should read the intermediate result. The
>>>>>>>> intermediate result
>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>> gone in the following three cases: 1. the user
>>>>>> explicitly
>>>>>>>> call the
>>>>>>>>>>>>>>>> invalidateCache() method 2. the TableEnvironment is
>>>>>> closed
>>>>>>>> 3. failure
>>>>>>>>>>>>>>>> happens on the TM. When that happens, the intermeidate
>>>>>>>> result will not
>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>> available unless it is re-generated.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> 3. In the "semantic of cache() method" section, the
>>>>>>>> description "The
>>>>>>>>>>>>>>>>> semantic of the *cache() *method is a little
>>>>>> different
>>>>>>>> depending on
>>>>>>>>>>>>>>>> whether
>>>>>>>>>>>>>>>>> auto caching is enabled or not." seems not explained.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> This line is actually outdated and should be removed,
>>>>>> as
>>>>>>>> we are not
>>>>>>>>>>>>>>> adding
>>>>>>>>>>>>>>>> the auto caching functionality in this FLIP. Auto
>>>>>> caching
>>>>>>>> will be added
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>> the future, and the semantic of cache() when auto
>>>>>> caching
>>>>>>>> is enabled
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>> be discussed in detail by a new FLIP. I will remove the
>>>>>>>> descriptor to
>>>>>>>>>>>>>>> avoid
>>>>>>>>>>>>>>>> further confusion.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi folks,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I'd like to start the discussion about FLIP-36
>>>>>> Support
>>>>>>>> Interactive
>>>>>>>>>>>>>>>>>> Programming in Flink Table API
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The FLIP proposes to add support for interactive
>>>>>>>> programming in
>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>>> Table
>>>>>>>>>>>>>>>>>> API. Specifically, it let users cache the
>>>>>> intermediate
>>>>>>>>>>>>>>> results(tables)
>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>> use them in the later jobs.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Even though the FLIP has been discussed in the
>>>>>>>> past[1], the FLIP
>>>>>>>>>>>>>>> hasn't
>>>>>>>>>>>>>>>>>> formally passed the vote yet. And some of the
>>>>>> design
>>>>>>>> and
>>>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>>>> detail have to change to incorporates the cluster
>>>>>>>> partition
>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>> FLIP-67[2].
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Looking forward to your feedback.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>> Xuannan
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>
>>>>>> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi Timo,

Thanks for pointing out the mistake, I have made the update accordingly.

And I added more information to the FLIP to address the problem you raised in the email.

To sum it up:
> 1. How does `ClusterPartitionDescriptor` look like?
The ClusterPartitionDescriptor will include the ShuffleDescriptor and some meta information, i.e., numberOfSubpartitions, partitionType, and the IntermediateDataSetID. Since it contains a runtime class ShuffleDescriptor, it should be serialized before send back to the client via JobResult.
> 2. Will we move `IntermediateDataSetID` and `ClusterPartitionDescriptor`
> to flink-core?
For ClusterPartitionDescriptor, it should not move to flink-core as it contains some runtime information (ShuffleDescriptor). Instead, we will have a ClusterPartitionDescriptor interface in the flink-core and have the implementation in flink-runtime. The IntermediateDataSetID doesn’t have to move to flink-core. The IntermediateDataSetID is a subclass of AbstractID, which is already in the flink-core. The client can keep the IntermediateDataSetID as AbstractID.
> 3. Will `TableEnvironment` implement `AutoClosable`?
The TableEnvironment will implement AutoCloseable. However, it is still the users’ responsibility to use the try-with-resource pattern, otherwise they need to call close method at the end.
> 4. "The TableEnvironment will fell back and resubmit the original DAG
> without using the cache."
The CatalogManager in the TableEnvironment should have the original operation tree of the cached node so that we can resubmit the original DAG. And the whole process should be transparent to the user, as stated in the FLIP.
> 5. "tables is submitted to in Per-Job mode, the returned
> ClusterPartitionDescriptors will be ignored so that the planner will not
> attempt to replace the subtree"
> How do you imagine that? Where do you distinguish between per-job and
> session mode?
The StreamExecutionEnvironment can distinguish between per-job and session mode by the type of the PipelineExecutor, i.e, AbstractJobClusterExecutor vs AbstractSessionClusterExecutor.

An implementation plan is added at the end of the FLIP as well. Please have a look.

Best,
Xuannan
On Sep 4, 2020, 7:29 PM +0800, Timo Walther , wrote:

> Hi Xuannan,
>
> thanks for the update. Here is some final feedback from my side. Maybe
> others have some final feedback as well before we continue to a voting?
>
> Some mistakes that we should fix in the FLIP:
> - The FLIP declares `Table cache();` but I guess this should be
> `CachedTable cache();` now.
> - The FLIP mentions `JobGraphGenerator` a couple of times but I guess we
> are talking about the StreamingJobGraphGenerator.
>
> I know that the FLIP is already quite large, but it would be good to get
> a bit more information about:
>
> 1. How does `ClusterPartitionDescriptor` look like?
>
> 2. Will we move `IntermediateDataSetID` and `ClusterPartitionDescriptor`
> to flink-core?
>
> 3. Will `TableEnvironment` implement `AutoClosable`?
>
> 4. "The TableEnvironment will fell back and resubmit the original DAG
> without using the cache."
>
> How do you imagine that? I guess the user needs to re-trigger the
> execution, right?
>
> 5. "tables is submitted to in Per-Job mode, the returned
> ClusterPartitionDescriptors will be ignored so that the planner will not
> attempt to replace the subtree"
>
> How do you imagine that? Where do you distinguish between per-job and
> session mode?
>
> 6. You mentioned in you last mail "SQL integration in the future work
> section."
>
> I couldn't find this in the FLIP. Is the online FLIP the newest version?
>
> 7. Could you add an Implementation Plan section? It should explain how
> this topic is split into the subtasks and potential PRs. Ideally, those
> subtasks are split by layer to be reviewed by the individual component
> teams.
>
> Thanks,
> Timo
>
>
>
> On 25.08.20 12:45, Xuannan Su wrote:
> > Hi Timo,
> >
> > Thanks for your comments. After the offline discussion, I have updated the FLIP with the following change.
> >
> > 1. Update the end to end process
> > a. The Table.cache method should only wrap the origin query operation with CacheOperation.
> > b. The planner will add the CacheSink or replace subtree with CacheSource while translating the QueryOperation to rel node. The CacheSink and CacheSource are treated as regular sinks and sources while the planner optimizes and translates the rel nodes.
> > c. The StreamGraphGenerator will recognize the CacheSink and CacheSource and generate the StreamGraph accordingly. When it sees the CacheSink, it will remove the CacheSink and set the cache flag in the StreamNode. When it sees the CacheSource, it will include the ClusterPartitionDescriptor in the StreamNode.
> > d. The JobGraph generator will not modify the graph. It only set the result partition type of the intermediate result to BLOCKING_PERSISTENT if it sees a StreamNode with the cache flag set or passes the ClusterPartitionDescriptor from the StreamNode to the JobVertex if the StreamNode has the ClusterPartitionDescriptor.
> > e. The ClusterPartitionDescriptor will be included in the JobResult and sent back to the client.
> > 2. The metadata of the cached table will be stored in the CatalogManager instead of the TableEnvironment
> > 3. The Table.cache method returns a CachedTable, which is a subclass of Table.
> > 4. Add a paragraph to explain that the cache table will not work in Per-Job Mode cluster.
> > 5. Add SQL integration and Cache Eviction in the future work section.
> >
> > As of the @Public and @PublicEvolving interface, I think it should be covered in the public interface section, i.e., Table and TableEnvironment. Other than those, all the changes should only affect the internal class.
> >
> > Please let me know if you have any further comments.
> >
> > Best,
> > Xuannan
> > On Aug 10, 2020, 9:32 PM +0800, Timo Walther <[hidden email]>, wrote:
> > > Hi Xuannan,
> > >
> > > sorry for joining the discussion so late. I agree that this is a very
> > > nice and useful feature. However, the impact it has to many components
> > > in the stack requires more discussion in my opinion.
> > >
> > > 1) Separation of concerns:
> > > The current design seems to mix different layers. We should make sure
> > > that all layers do what they are supposed to do:
> > >
> > > 1a) The FLIP states: "The cache() method returns a new Table object with
> > > a flag set."
> > >
> > > The `Table` object is just a thin API class that wraps a
> > > `QueryOperation`. Other than that the `Table` object should not contain
> > > futher state. The tree of `QueryOperation` should be an immutable,
> > > independent data structure that can be passed around and will eventually
> > > be passed to the `Planner`.
> > >
> > > The mentioned `CacheSink` should be added by the optimizer. It is not
> > > the responsibility of the API do perform optimizer-like tasks. A call to
> > > `t1.cache()` should simply wrap the original operation into something
> > > like `CacheOperation(t1.getQueryOperation)`. A `CacheOperation` could
> > > still extend from `QueryOperation` and assign a unique string identifier
> > > already. A specialized `StreamTransformation` would be necessary during
> > > translation.
> > >
> > > 1b) The FLIP states: "The default table service stores the metadata in
> > > the client (e.g. TableEnvironment)"
> > >
> > > `TableEnvironment` is not a client. Similar to `Table`, it is an API
> > > class that just delegates to other session components. Currently, the
> > > table environment has (again) to many responsibilities that should
> > > better be split into other components. The table's `Executor` is the
> > > client that performs the cluster communication. But in general it also
> > > just delegates to `org.apache.flink.core.execution.PipelineExecutor`.
> > > IMO the `PipelineExecutor` is a better fit for a back-and-forth
> > > communication to determine existing cluster partitions and modify the
> > > job graph. Or even further down the stack, because as far as I know,
> > > `PipelineExecutor` works with `StreamGraph`.
> > >
> > > `flink-table-api-java` has no dependency to `flink-runtime`. This has
> > > been done on purpose.
> > >
> > > 2) API
> > > I still see the rejected option 2 a good fit to expose this feature.
> > >
> > > A `Table.cache(): CachedTable` with `CachedTable.invalidate(): void` and
> > > maybe `CachedTable.getId(): String` makes the feature and its operations
> > > very explicit. It also avoids following up questions such as:
> > >
> > > Will `invalidateCache()` be transitively propagated in
> > > `t1.cache().join(t2.cache()).invalidateCache()`?
> > >
> > > Or as the FLIP states:
> > >
> > > `Table t3 = t1.select(...) // cache will NOT be used.`
> > > but
> > > `t1.invalidateCache() // cache will be released`
> > >
> > > This sounds a bit contradicting to me. Because sometimes the
> > > `t1.cache()` has implications on t1 and sometimes not.
> > >
> > > 3) Big picture
> > >
> > > After reading the FLIP, I still don't understand how a user can
> > > configure or control the table service. Will we offer options through
> > > `TableConfig` or `TableEnvironment` or is this configuration done via
> > > ConfigOptions for lower layers?
> > >
> > > 4) SQL integration
> > >
> > > As I mentioned earlier, we should think about a SQL integration as well.
> > > Otherwise we need to redesign the Java API to align it with SQL later.
> > > SQL has also a bigger user base than Table API. Let's assume we
> > > introduce a new keyword and combine the caching with regular CREATE VIEW
> > > syntax such as:
> > > `CREATE CACHED TEMPORARY VIEW MyTable AS SELECT *`
> > > This would align well with:
> > > `tEnv.registerTemporaryView("MyTable", table.cache())`
> > >
> > > What do you think?
> > >
> > > 4) TableEnvironment.close()
> > >
> > > Will `TableEnvironment` implement `AutoCloseable`?
> > >
> > >
> > > In summary, I think the FLIP should go into more details how this effort
> > > affects each layer. Because a lot of the interfaces are `@Public` or
> > > `@PublicEvolving`. And the FLIP still leaves a lot of questions how this
> > > high level concept ends up in JobGraph.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > >
> > > On 30.07.20 09:00, Xuannan Su wrote:
> > > > Hi folks,
> > > >
> > > > It seems that all the raised concerns so far have been resolved. I plan to start a voting thread for FLIP-36 early next week if there are no comments.
> > > >
> > > > Thanks,
> > > > Xuannan
> > > > On Jul 28, 2020, 7:42 PM +0800, Xuannan Su <[hidden email]>, wrote:
> > > > > Hi Kurt,
> > > > >
> > > > > Thanks for the comments.
> > > > >
> > > > > You are right that the FLIP lacks a proper discussion about the impact of the optimizer. I have added the section to talk about how the cache table works with the optimizer. I hope this could resolve your concern. Please let me know if you have any further comments.
> > > > >
> > > > > Best,
> > > > > Xuannan
> > > > > On Jul 22, 2020, 4:36 PM +0800, Kurt Young <[hidden email]>, wrote:
> > > > > > Thanks for the reply, I have one more comment about the optimizer
> > > > > > affection. Even if you are
> > > > > > trying to make the cached table be as orthogonal to the optimizer as
> > > > > > possible by introducing
> > > > > > a special sink, it is still not clear why this approach is safe. Maybe you
> > > > > > can add some process
> > > > > > introduction from API to JobGraph, otherwise I can't make sure everyone
> > > > > > reviewing the design
> > > > > > doc will have the same imagination about this. And I'm also quite sure some
> > > > > > of the existing
> > > > > > mechanism will be affected by this special sink, e.g. multi sink
> > > > > > optimization.
> > > > > >
> > > > > > Best,
> > > > > > Kurt
> > > > > >
> > > > > >
> > > > > > On Wed, Jul 22, 2020 at 2:31 PM Xuannan Su <[hidden email]> wrote:
> > > > > >
> > > > > > > Hi Kurt,
> > > > > > >
> > > > > > > Thanks for the comments.
> > > > > > >
> > > > > > > 1. How do you identify the CachedTable?
> > > > > > > For the current design proposed in FLIP-36, we are using the first
> > > > > > > approach you mentioned, where the key of the map is the Cached Table java
> > > > > > > object. I think it is fine not to be able to identify another table
> > > > > > > representing the same DAG and not using the cached intermediate result
> > > > > > > because we want to make the caching table explicit. As mentioned in the
> > > > > > > FLIP, the cache API will return a Table object. And the user has to use the
> > > > > > > returned Table object to make use of the cached table. The rationale is
> > > > > > > that if the user builds the same DAG from scratch with some
> > > > > > > TableEnvironment instead of using the cached table object, the user
> > > > > > > probably doesn't want to use the cache.
> > > > > > >
> > > > > > > 2. How does the CachedTable affect the optimizer?
> > > > > > > We try to make the logic dealing with the cached table be as orthogonal to
> > > > > > > the optimizer as possible. That's why we introduce a special sink when we
> > > > > > > are going to cache a table and a special source when we are going to use a
> > > > > > > cached table. This way, we can let the optimizer does it works, and the
> > > > > > > logic of modifying the job graph can happen in the job graph generator. We
> > > > > > > can recognize the cached node with the special sink and source.
> > > > > > >
> > > > > > > 3. What's the effect of calling TableEnvironment.close()?
> > > > > > > We introduce the close method to prevent leaking of the cached table when
> > > > > > > the user is done with the table environment. Therefore, it makes more sense
> > > > > > > that the table environment, including all of its functionality, should not
> > > > > > > be used after closing. Otherwise, we should rename the close method to
> > > > > > > clearAllCache or something similar.
> > > > > > >
> > > > > > > And thanks for pointing out the use of not existing API used in the given
> > > > > > > examples. I have updated the examples in the FLIP accordingly.
> > > > > > >
> > > > > > > Best,
> > > > > > > Xuannan
> > > > > > > On Jul 16, 2020, 4:15 PM +0800, Kurt Young <[hidden email]>, wrote:
> > > > > > > > Hi Xuanna,
> > > > > > > >
> > > > > > > > Thanks for the detailed design doc, it described clearly how the API
> > > > > > > looks
> > > > > > > > and how to interact with Flink runtime.
> > > > > > > > However, the part which relates to SQL's optimizer is kind of blurry. To
> > > > > > > be
> > > > > > > > more precise, I have following questions:
> > > > > > > >
> > > > > > > > 1. How do you identify the CachedTable? I can imagine there would be map
> > > > > > > > representing the cache, how do you
> > > > > > > > compare the keys of the map? One approach is they will be compared by
> > > > > > > java
> > > > > > > > objects, which is simple but has
> > > > > > > > limited scope. For example, users created another table using some
> > > > > > > > interfaces of TableEnvironment, and the table
> > > > > > > > is exactly the same as the cached one, you won't be able to identify it.
> > > > > > > > Another choice is calculating the "signature" or
> > > > > > > > "diest" of the cached table, which involves string representation of the
> > > > > > > > whole sub tree represented by the cached table.
> > > > > > > > I don't think Flink currently provides such a mechanism around Table
> > > > > > > > though.
> > > > > > > >
> > > > > > > > 2. How does the CachedTable affect the optimizer? Specifically, will you
> > > > > > > > have a dedicated QueryOperation for it, will you have
> > > > > > > > a dedicated logical & physical RelNode for it? And I also don't see a
> > > > > > > > description about how to work with current optimize phases,
> > > > > > > > from Operation to Calcite rel node, and then to Flink's logical and
> > > > > > > > physical node, which will be at last translated to Flink's exec node.
> > > > > > > > There also exists other optimizations such as dead lock breaker, as well
> > > > > > > as
> > > > > > > > sub plan reuse inside the optimizer, I'm not sure whether
> > > > > > > > the logic dealing with cached tables can be orthogonal to all of these.
> > > > > > > > Hence I expect you could have a more detailed description here.
> > > > > > > >
> > > > > > > > 3. What's the effect of calling TableEnvironment.close()? You already
> > > > > > > > explained this would drop all caches this table env has,
> > > > > > > > could you also explain where other functionality still works for this
> > > > > > > table
> > > > > > > > env? Like can use still create/drop tables/databases/function
> > > > > > > > through this table env? What happens to the catalog and all temporary
> > > > > > > > objects of this table env?
> > > > > > > >
> > > > > > > > One minor comment: I noticed you used some not existing API in the
> > > > > > > examples
> > > > > > > > you gave, like table.collect(), which is a little
> > > > > > > > misleading.
> > > > > > > >
> > > > > > > > Best,
> > > > > > > > Kurt
> > > > > > > >
> > > > > > > >
> > > > > > > > On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <[hidden email]> wrote:
> > > > > > > >
> > > > > > > > > Hi folks,
> > > > > > > > >
> > > > > > > > > I'd like to revive the discussion about FLIP-36 Support Interactive
> > > > > > > > > Programming in Flink Table API
> > > > > > > > >
> > > > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > > >
> > > > > > > > > The FLIP proposes to add support for interactive programming in Flink
> > > > > > > > > Table API. Specifically, it let users cache the intermediate
> > > > > > > > > results(tables) and use them in the later jobs to avoid recomputing the
> > > > > > > > > intermediate result(tables).
> > > > > > > > >
> > > > > > > > > I am looking forward to any opinions and suggestions from the
> > > > > > > community.
> > > > > > > > >
> > > > > > > > > Best,
> > > > > > > > > Xuannan
> > > > > > > > > On May 7, 2020, 5:40 PM +0800, Xuannan Su <[hidden email]>,
> > > > > > > wrote:
> > > > > > > > > > Hi,
> > > > > > > > > >
> > > > > > > > > > There are some feedbacks from @Timo and @Kurt in the voting thread
> > > > > > > for
> > > > > > > > > FLIP-36 and I want to share my thoughts here.
> > > > > > > > > >
> > > > > > > > > > 1. How would the FLIP-36 look like after FLIP-84?
> > > > > > > > > > I don't think FLIP-84 will affect FLIP-36 from the public API
> > > > > > > > > perspective. Users can call .cache on a table object and the cached
> > > > > > > table
> > > > > > > > > will be generated whenever the table job is triggered to execute,
> > > > > > > either by
> > > > > > > > > Table#executeInsert or StatementSet#execute. I think that FLIP-36
> > > > > > > should
> > > > > > > > > aware of the changes made by FLIP-84, but it shouldn't be a problem.
> > > > > > > At the
> > > > > > > > > end of the day, FLIP-36 only requires the ability to add a sink to a
> > > > > > > node,
> > > > > > > > > submit a table job with multiple sinks, and replace the cached table
> > > > > > > with a
> > > > > > > > > source.
> > > > > > > > > >
> > > > > > > > > > 2. How can we support cache in a multi-statement SQL file?
> > > > > > > > > > The most intuitive way to support cache in a multi-statement SQL
> > > > > > > file is
> > > > > > > > > by using a view, where the view is corresponding to a cached table.
> > > > > > > > > >
> > > > > > > > > > 3. Unifying the cached table and materialized views
> > > > > > > > > > It is true that the cached table and the materialized view are
> > > > > > > similar
> > > > > > > > > in some way. However, I think the materialized view is a more complex
> > > > > > > > > concept. First, a materialized view requires some kind of a refresh
> > > > > > > > > mechanism to synchronize with the table. Secondly, the life cycle of a
> > > > > > > > > materialized view is longer. The materialized view should be accessible
> > > > > > > > > even after the application exits and should be accessible by another
> > > > > > > > > application, while the cached table is only accessible in the
> > > > > > > application
> > > > > > > > > where it is created. The cached table is introduced to avoid
> > > > > > > recomputation
> > > > > > > > > of an intermediate table to support interactive programming in Flink
> > > > > > > Table
> > > > > > > > > API. And I think the materialized view needs more discussion and
> > > > > > > certainly
> > > > > > > > > deserves a whole new FLIP.
> > > > > > > > > >
> > > > > > > > > > Please let me know your thought.
> > > > > > > > > >
> > > > > > > > > > Best,
> > > > > > > > > > Xuannan
> > > > > > > > > >
> > > > > > > > > On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[hidden email]>
> > > > > > > wrote:
> > > > > > > > > > Hi folks,
> > > > > > > > > >
> > > > > > > > > > The FLIP-36 is updated according to the discussion with Becket. In
> > > > > > > the
> > > > > > > > > meantime, any comments are very welcome.
> > > > > > > > > >
> > > > > > > > > > If there are no further comments, I would like to start the voting
> > > > > > > > > > thread by tomorrow.
> > > > > > > > > >
> > > > > > > > > > Thanks,
> > > > > > > > > > Xuannan
> > > > > > > > > >
> > > > > > > > > >
> > > > > > > > > > > On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]>
> > > > > > > > > wrote:
> > > > > > > > > > > > Hi Becket,
> > > > > > > > > > > >
> > > > > > > > > > > > You are right. It makes sense to treat retry of job 2 as an
> > > > > > > ordinary
> > > > > > > > > job. And the config does introduce some unnecessary confusion. Thank
> > > > > > > you
> > > > > > > > > for you comment. I will update the FLIP.
> > > > > > > > > > > >
> > > > > > > > > > > > Best,
> > > > > > > > > > > > Xuannan
> > > > > > > > > > > >
> > > > > > > > > > > > > On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <
> > > > > > > [hidden email]>
> > > > > > > > > wrote:
> > > > > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > If user submits Job 1 and generated a cached intermediate
> > > > > > > > > result. And later
> > > > > > > > > > > > > > on, user submitted job 2 which should ideally use the
> > > > > > > > > intermediate result.
> > > > > > > > > > > > > > In that case, if job 2 failed due to missing the intermediate
> > > > > > > > > result, Job 2
> > > > > > > > > > > > > > should be retried with its full DAG. After that when Job 2
> > > > > > > runs,
> > > > > > > > > it will
> > > > > > > > > > > > > > also re-generate the cache. However, once job 2 has fell
> > > > > > > back to
> > > > > > > > > the
> > > > > > > > > > > > > > original DAG, should it just be treated as an ordinary job
> > > > > > > that
> > > > > > > > > follow the
> > > > > > > > > > > > > > recovery strategy? Having a separate configuration seems a
> > > > > > > little
> > > > > > > > > > > > > > confusing. In another word, re-generating the cache is just a
> > > > > > > > > byproduct of
> > > > > > > > > > > > > > running the full DAG of job 2, but is not the main purpose.
> > > > > > > It
> > > > > > > > > is just like
> > > > > > > > > > > > > > when job 1 runs to generate cache, it does not have a
> > > > > > > separate
> > > > > > > > > config of
> > > > > > > > > > > > > > retry to make sure the cache is generated. If it fails, it
> > > > > > > just
> > > > > > > > > fail like
> > > > > > > > > > > > > > an ordinary job.
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > What do you think?
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
> > > > > > > > > [hidden email]> wrote:
> > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Hi Becket,
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > The intermediate result will indeed be automatically
> > > > > > > > > re-generated by
> > > > > > > > > > > > > > > resubmitting the original DAG. And that job could fail as
> > > > > > > > > well. In that
> > > > > > > > > > > > > > > case, we need to decide if we should resubmit the original
> > > > > > > DAG
> > > > > > > > > to
> > > > > > > > > > > > > > > re-generate the intermediate result or give up and throw an
> > > > > > > > > exception to
> > > > > > > > > > > > > > > the user. And the config is to indicate how many resubmit
> > > > > > > > > should happen
> > > > > > > > > > > > > > > before giving up.
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > Xuannan
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
> > > > > > > > > [hidden email]> wrote:
> > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > I am not entirely sure if I understand the cases you
> > > > > > > > > mentioned. The
> > > > > > > > > > > > > > > users
> > > > > > > > > > > > > > > > > can use the cached table object returned by the
> > > > > > > .cache()
> > > > > > > > > method in
> > > > > > > > > > > > > > > other
> > > > > > > > > > > > > > > > > job and it should read the intermediate result. The
> > > > > > > > > intermediate result
> > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > gone in the following three cases: 1. the user
> > > > > > > explicitly
> > > > > > > > > call the
> > > > > > > > > > > > > > > > > invalidateCache() method 2. the TableEnvironment is
> > > > > > > closed
> > > > > > > > > 3. failure
> > > > > > > > > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > > > > > > > > result will not
> > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > available unless it is re-generated.
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > What confused me was that why do we need to have a
> > > > > > > > > *cache.retries.max
> > > > > > > > > > > > > > > > *config?
> > > > > > > > > > > > > > > > Shouldn't the missing intermediate result always be
> > > > > > > > > automatically
> > > > > > > > > > > > > > > > re-generated if it is gone?
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
> > > > > > > > > [hidden email]>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Hi Becket,
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Thanks for the comments.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
> > > > > > > > > [hidden email]>
> > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Hi Xuannan,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks for picking up the FLIP. It looks good to me
> > > > > > > > > overall. Some
> > > > > > > > > > > > > > > quick
> > > > > > > > > > > > > > > > > > comments / questions below:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 1. Do we also need changes in the Java API?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > Yes, the public interface of Table and TableEnvironment
> > > > > > > > > should be made
> > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > the Java API.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > 2. What are the cases that users may want to retry
> > > > > > > > > reading the
> > > > > > > > > > > > > > > > > intermediate
> > > > > > > > > > > > > > > > > > result? It seems that once the intermediate result
> > > > > > > has
> > > > > > > > > gone, it will
> > > > > > > > > > > > > > > > not
> > > > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > > available later without being generated again, right?
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > I am not entirely sure if I understand the cases you
> > > > > > > > > mentioned. The
> > > > > > > > > > > > > > > > users
> > > > > > > > > > > > > > > > > can use the cached table object returned by the
> > > > > > > .cache()
> > > > > > > > > method in
> > > > > > > > > > > > > > > other
> > > > > > > > > > > > > > > > > job and it should read the intermediate result. The
> > > > > > > > > intermediate result
> > > > > > > > > > > > > > > > can
> > > > > > > > > > > > > > > > > gone in the following three cases: 1. the user
> > > > > > > explicitly
> > > > > > > > > call the
> > > > > > > > > > > > > > > > > invalidateCache() method 2. the TableEnvironment is
> > > > > > > closed
> > > > > > > > > 3. failure
> > > > > > > > > > > > > > > > > happens on the TM. When that happens, the intermeidate
> > > > > > > > > result will not
> > > > > > > > > > > > > > > be
> > > > > > > > > > > > > > > > > available unless it is re-generated.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > 3. In the "semantic of cache() method" section, the
> > > > > > > > > description "The
> > > > > > > > > > > > > > > > > > semantic of the *cache() *method is a little
> > > > > > > different
> > > > > > > > > depending on
> > > > > > > > > > > > > > > > > whether
> > > > > > > > > > > > > > > > > > auto caching is enabled or not." seems not explained.
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > This line is actually outdated and should be removed,
> > > > > > > as
> > > > > > > > > we are not
> > > > > > > > > > > > > > > > adding
> > > > > > > > > > > > > > > > > the auto caching functionality in this FLIP. Auto
> > > > > > > caching
> > > > > > > > > will be added
> > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > the future, and the semantic of cache() when auto
> > > > > > > caching
> > > > > > > > > is enabled
> > > > > > > > > > > > > > > will
> > > > > > > > > > > > > > > > > be discussed in detail by a new FLIP. I will remove the
> > > > > > > > > descriptor to
> > > > > > > > > > > > > > > > avoid
> > > > > > > > > > > > > > > > > further confusion.
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > Jiangjie (Becket) Qin
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
> > > > > > > > > [hidden email]>
> > > > > > > > > > > > > > > > > wrote:
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Hi folks,
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > I'd like to start the discussion about FLIP-36
> > > > > > > Support
> > > > > > > > > Interactive
> > > > > > > > > > > > > > > > > > > Programming in Flink Table API
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > The FLIP proposes to add support for interactive
> > > > > > > > > programming in
> > > > > > > > > > > > > > > Flink
> > > > > > > > > > > > > > > > > > Table
> > > > > > > > > > > > > > > > > > > API. Specifically, it let users cache the
> > > > > > > intermediate
> > > > > > > > > > > > > > > > results(tables)
> > > > > > > > > > > > > > > > > > and
> > > > > > > > > > > > > > > > > > > use them in the later jobs.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Even though the FLIP has been discussed in the
> > > > > > > > > past[1], the FLIP
> > > > > > > > > > > > > > > > hasn't
> > > > > > > > > > > > > > > > > > > formally passed the vote yet. And some of the
> > > > > > > design
> > > > > > > > > and
> > > > > > > > > > > > > > > > implementation
> > > > > > > > > > > > > > > > > > > detail have to change to incorporates the cluster
> > > > > > > > > partition
> > > > > > > > > > > > > > > proposed
> > > > > > > > > > > > > > > > in
> > > > > > > > > > > > > > > > > > > FLIP-67[2].
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Looking forward to your feedback.
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > Thanks,
> > > > > > > > > > > > > > > > > > > Xuannan
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > > [1]
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > >
> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
> > > > > > > > > > > > > > > > > > > [2]
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > >
> > > > > > > https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
> > > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > > >
> > > > > > > > > > > > > > >
> > > > > > > > >
> > > > > > >
> > > >
> > >
> >
>
12