[DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

classic Classic list List threaded Threaded
27 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Timo Walther-2
Hi Xuannan,

Thanks for updating the FLIP and answering my questions.

The FLIP is good to go from my side. What do others think?

Regards,
Timo


On 10.09.20 09:00, Xuannan Su wrote:

> Hi Timo,
>
> Thanks for pointing out the mistake, I have made the update accordingly.
>
> And I added more information to the FLIP to address the problem you raised in the email.
>
> To sum it up:
>> 1. How does `ClusterPartitionDescriptor` look like?
> The ClusterPartitionDescriptor will include the ShuffleDescriptor and some meta information, i.e., numberOfSubpartitions, partitionType, and the IntermediateDataSetID. Since it contains a runtime class ShuffleDescriptor, it should be serialized before send back to the client via JobResult.
>> 2. Will we move `IntermediateDataSetID` and `ClusterPartitionDescriptor`
>> to flink-core?
> For ClusterPartitionDescriptor, it should not move to flink-core as it contains some runtime information (ShuffleDescriptor). Instead, we will have a ClusterPartitionDescriptor interface in the flink-core and have the implementation in flink-runtime. The IntermediateDataSetID doesn’t have to move to flink-core. The IntermediateDataSetID is a subclass of AbstractID, which is already in the flink-core. The client can keep the IntermediateDataSetID as AbstractID.
>> 3. Will `TableEnvironment` implement `AutoClosable`?
> The TableEnvironment will implement AutoCloseable. However, it is still the users’ responsibility to use the try-with-resource pattern, otherwise they need to call close method at the end.
>> 4. "The TableEnvironment will fell back and resubmit the original DAG
>> without using the cache."
> The CatalogManager in the TableEnvironment should have the original operation tree of the cached node so that we can resubmit the original DAG. And the whole process should be transparent to the user, as stated in the FLIP.
>> 5. "tables is submitted to in Per-Job mode, the returned
>> ClusterPartitionDescriptors will be ignored so that the planner will not
>> attempt to replace the subtree"
>> How do you imagine that? Where do you distinguish between per-job and
>> session mode?
> The StreamExecutionEnvironment can distinguish between per-job and session mode by the type of the PipelineExecutor, i.e, AbstractJobClusterExecutor vs AbstractSessionClusterExecutor.
>
> An implementation plan is added at the end of the FLIP as well. Please have a look.
>
> Best,
> Xuannan
> On Sep 4, 2020, 7:29 PM +0800, Timo Walther , wrote:
>> Hi Xuannan,
>>
>> thanks for the update. Here is some final feedback from my side. Maybe
>> others have some final feedback as well before we continue to a voting?
>>
>> Some mistakes that we should fix in the FLIP:
>> - The FLIP declares `Table cache();` but I guess this should be
>> `CachedTable cache();` now.
>> - The FLIP mentions `JobGraphGenerator` a couple of times but I guess we
>> are talking about the StreamingJobGraphGenerator.
>>
>> I know that the FLIP is already quite large, but it would be good to get
>> a bit more information about:
>>
>> 1. How does `ClusterPartitionDescriptor` look like?
>>
>> 2. Will we move `IntermediateDataSetID` and `ClusterPartitionDescriptor`
>> to flink-core?
>>
>> 3. Will `TableEnvironment` implement `AutoClosable`?
>>
>> 4. "The TableEnvironment will fell back and resubmit the original DAG
>> without using the cache."
>>
>> How do you imagine that? I guess the user needs to re-trigger the
>> execution, right?
>>
>> 5. "tables is submitted to in Per-Job mode, the returned
>> ClusterPartitionDescriptors will be ignored so that the planner will not
>> attempt to replace the subtree"
>>
>> How do you imagine that? Where do you distinguish between per-job and
>> session mode?
>>
>> 6. You mentioned in you last mail "SQL integration in the future work
>> section."
>>
>> I couldn't find this in the FLIP. Is the online FLIP the newest version?
>>
>> 7. Could you add an Implementation Plan section? It should explain how
>> this topic is split into the subtasks and potential PRs. Ideally, those
>> subtasks are split by layer to be reviewed by the individual component
>> teams.
>>
>> Thanks,
>> Timo
>>
>>
>>
>> On 25.08.20 12:45, Xuannan Su wrote:
>>> Hi Timo,
>>>
>>> Thanks for your comments. After the offline discussion, I have updated the FLIP with the following change.
>>>
>>> 1. Update the end to end process
>>> a. The Table.cache method should only wrap the origin query operation with CacheOperation.
>>> b. The planner will add the CacheSink or replace subtree with CacheSource while translating the QueryOperation to rel node. The CacheSink and CacheSource are treated as regular sinks and sources while the planner optimizes and translates the rel nodes.
>>> c. The StreamGraphGenerator will recognize the CacheSink and CacheSource and generate the StreamGraph accordingly. When it sees the CacheSink, it will remove the CacheSink and set the cache flag in the StreamNode. When it sees the CacheSource, it will include the ClusterPartitionDescriptor in the StreamNode.
>>> d. The JobGraph generator will not modify the graph. It only set the result partition type of the intermediate result to BLOCKING_PERSISTENT if it sees a StreamNode with the cache flag set or passes the ClusterPartitionDescriptor from the StreamNode to the JobVertex if the StreamNode has the ClusterPartitionDescriptor.
>>> e. The ClusterPartitionDescriptor will be included in the JobResult and sent back to the client.
>>> 2. The metadata of the cached table will be stored in the CatalogManager instead of the TableEnvironment
>>> 3. The Table.cache method returns a CachedTable, which is a subclass of Table.
>>> 4. Add a paragraph to explain that the cache table will not work in Per-Job Mode cluster.
>>> 5. Add SQL integration and Cache Eviction in the future work section.
>>>
>>> As of the @Public and @PublicEvolving interface, I think it should be covered in the public interface section, i.e., Table and TableEnvironment. Other than those, all the changes should only affect the internal class.
>>>
>>> Please let me know if you have any further comments.
>>>
>>> Best,
>>> Xuannan
>>> On Aug 10, 2020, 9:32 PM +0800, Timo Walther <[hidden email]>, wrote:
>>>> Hi Xuannan,
>>>>
>>>> sorry for joining the discussion so late. I agree that this is a very
>>>> nice and useful feature. However, the impact it has to many components
>>>> in the stack requires more discussion in my opinion.
>>>>
>>>> 1) Separation of concerns:
>>>> The current design seems to mix different layers. We should make sure
>>>> that all layers do what they are supposed to do:
>>>>
>>>> 1a) The FLIP states: "The cache() method returns a new Table object with
>>>> a flag set."
>>>>
>>>> The `Table` object is just a thin API class that wraps a
>>>> `QueryOperation`. Other than that the `Table` object should not contain
>>>> futher state. The tree of `QueryOperation` should be an immutable,
>>>> independent data structure that can be passed around and will eventually
>>>> be passed to the `Planner`.
>>>>
>>>> The mentioned `CacheSink` should be added by the optimizer. It is not
>>>> the responsibility of the API do perform optimizer-like tasks. A call to
>>>> `t1.cache()` should simply wrap the original operation into something
>>>> like `CacheOperation(t1.getQueryOperation)`. A `CacheOperation` could
>>>> still extend from `QueryOperation` and assign a unique string identifier
>>>> already. A specialized `StreamTransformation` would be necessary during
>>>> translation.
>>>>
>>>> 1b) The FLIP states: "The default table service stores the metadata in
>>>> the client (e.g. TableEnvironment)"
>>>>
>>>> `TableEnvironment` is not a client. Similar to `Table`, it is an API
>>>> class that just delegates to other session components. Currently, the
>>>> table environment has (again) to many responsibilities that should
>>>> better be split into other components. The table's `Executor` is the
>>>> client that performs the cluster communication. But in general it also
>>>> just delegates to `org.apache.flink.core.execution.PipelineExecutor`.
>>>> IMO the `PipelineExecutor` is a better fit for a back-and-forth
>>>> communication to determine existing cluster partitions and modify the
>>>> job graph. Or even further down the stack, because as far as I know,
>>>> `PipelineExecutor` works with `StreamGraph`.
>>>>
>>>> `flink-table-api-java` has no dependency to `flink-runtime`. This has
>>>> been done on purpose.
>>>>
>>>> 2) API
>>>> I still see the rejected option 2 a good fit to expose this feature.
>>>>
>>>> A `Table.cache(): CachedTable` with `CachedTable.invalidate(): void` and
>>>> maybe `CachedTable.getId(): String` makes the feature and its operations
>>>> very explicit. It also avoids following up questions such as:
>>>>
>>>> Will `invalidateCache()` be transitively propagated in
>>>> `t1.cache().join(t2.cache()).invalidateCache()`?
>>>>
>>>> Or as the FLIP states:
>>>>
>>>> `Table t3 = t1.select(...) // cache will NOT be used.`
>>>> but
>>>> `t1.invalidateCache() // cache will be released`
>>>>
>>>> This sounds a bit contradicting to me. Because sometimes the
>>>> `t1.cache()` has implications on t1 and sometimes not.
>>>>
>>>> 3) Big picture
>>>>
>>>> After reading the FLIP, I still don't understand how a user can
>>>> configure or control the table service. Will we offer options through
>>>> `TableConfig` or `TableEnvironment` or is this configuration done via
>>>> ConfigOptions for lower layers?
>>>>
>>>> 4) SQL integration
>>>>
>>>> As I mentioned earlier, we should think about a SQL integration as well.
>>>> Otherwise we need to redesign the Java API to align it with SQL later.
>>>> SQL has also a bigger user base than Table API. Let's assume we
>>>> introduce a new keyword and combine the caching with regular CREATE VIEW
>>>> syntax such as:
>>>> `CREATE CACHED TEMPORARY VIEW MyTable AS SELECT *`
>>>> This would align well with:
>>>> `tEnv.registerTemporaryView("MyTable", table.cache())`
>>>>
>>>> What do you think?
>>>>
>>>> 4) TableEnvironment.close()
>>>>
>>>> Will `TableEnvironment` implement `AutoCloseable`?
>>>>
>>>>
>>>> In summary, I think the FLIP should go into more details how this effort
>>>> affects each layer. Because a lot of the interfaces are `@Public` or
>>>> `@PublicEvolving`. And the FLIP still leaves a lot of questions how this
>>>> high level concept ends up in JobGraph.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>>
>>>> On 30.07.20 09:00, Xuannan Su wrote:
>>>>> Hi folks,
>>>>>
>>>>> It seems that all the raised concerns so far have been resolved. I plan to start a voting thread for FLIP-36 early next week if there are no comments.
>>>>>
>>>>> Thanks,
>>>>> Xuannan
>>>>> On Jul 28, 2020, 7:42 PM +0800, Xuannan Su <[hidden email]>, wrote:
>>>>>> Hi Kurt,
>>>>>>
>>>>>> Thanks for the comments.
>>>>>>
>>>>>> You are right that the FLIP lacks a proper discussion about the impact of the optimizer. I have added the section to talk about how the cache table works with the optimizer. I hope this could resolve your concern. Please let me know if you have any further comments.
>>>>>>
>>>>>> Best,
>>>>>> Xuannan
>>>>>> On Jul 22, 2020, 4:36 PM +0800, Kurt Young <[hidden email]>, wrote:
>>>>>>> Thanks for the reply, I have one more comment about the optimizer
>>>>>>> affection. Even if you are
>>>>>>> trying to make the cached table be as orthogonal to the optimizer as
>>>>>>> possible by introducing
>>>>>>> a special sink, it is still not clear why this approach is safe. Maybe you
>>>>>>> can add some process
>>>>>>> introduction from API to JobGraph, otherwise I can't make sure everyone
>>>>>>> reviewing the design
>>>>>>> doc will have the same imagination about this. And I'm also quite sure some
>>>>>>> of the existing
>>>>>>> mechanism will be affected by this special sink, e.g. multi sink
>>>>>>> optimization.
>>>>>>>
>>>>>>> Best,
>>>>>>> Kurt
>>>>>>>
>>>>>>>
>>>>>>> On Wed, Jul 22, 2020 at 2:31 PM Xuannan Su <[hidden email]> wrote:
>>>>>>>
>>>>>>>> Hi Kurt,
>>>>>>>>
>>>>>>>> Thanks for the comments.
>>>>>>>>
>>>>>>>> 1. How do you identify the CachedTable?
>>>>>>>> For the current design proposed in FLIP-36, we are using the first
>>>>>>>> approach you mentioned, where the key of the map is the Cached Table java
>>>>>>>> object. I think it is fine not to be able to identify another table
>>>>>>>> representing the same DAG and not using the cached intermediate result
>>>>>>>> because we want to make the caching table explicit. As mentioned in the
>>>>>>>> FLIP, the cache API will return a Table object. And the user has to use the
>>>>>>>> returned Table object to make use of the cached table. The rationale is
>>>>>>>> that if the user builds the same DAG from scratch with some
>>>>>>>> TableEnvironment instead of using the cached table object, the user
>>>>>>>> probably doesn't want to use the cache.
>>>>>>>>
>>>>>>>> 2. How does the CachedTable affect the optimizer?
>>>>>>>> We try to make the logic dealing with the cached table be as orthogonal to
>>>>>>>> the optimizer as possible. That's why we introduce a special sink when we
>>>>>>>> are going to cache a table and a special source when we are going to use a
>>>>>>>> cached table. This way, we can let the optimizer does it works, and the
>>>>>>>> logic of modifying the job graph can happen in the job graph generator. We
>>>>>>>> can recognize the cached node with the special sink and source.
>>>>>>>>
>>>>>>>> 3. What's the effect of calling TableEnvironment.close()?
>>>>>>>> We introduce the close method to prevent leaking of the cached table when
>>>>>>>> the user is done with the table environment. Therefore, it makes more sense
>>>>>>>> that the table environment, including all of its functionality, should not
>>>>>>>> be used after closing. Otherwise, we should rename the close method to
>>>>>>>> clearAllCache or something similar.
>>>>>>>>
>>>>>>>> And thanks for pointing out the use of not existing API used in the given
>>>>>>>> examples. I have updated the examples in the FLIP accordingly.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Xuannan
>>>>>>>> On Jul 16, 2020, 4:15 PM +0800, Kurt Young <[hidden email]>, wrote:
>>>>>>>>> Hi Xuanna,
>>>>>>>>>
>>>>>>>>> Thanks for the detailed design doc, it described clearly how the API
>>>>>>>> looks
>>>>>>>>> and how to interact with Flink runtime.
>>>>>>>>> However, the part which relates to SQL's optimizer is kind of blurry. To
>>>>>>>> be
>>>>>>>>> more precise, I have following questions:
>>>>>>>>>
>>>>>>>>> 1. How do you identify the CachedTable? I can imagine there would be map
>>>>>>>>> representing the cache, how do you
>>>>>>>>> compare the keys of the map? One approach is they will be compared by
>>>>>>>> java
>>>>>>>>> objects, which is simple but has
>>>>>>>>> limited scope. For example, users created another table using some
>>>>>>>>> interfaces of TableEnvironment, and the table
>>>>>>>>> is exactly the same as the cached one, you won't be able to identify it.
>>>>>>>>> Another choice is calculating the "signature" or
>>>>>>>>> "diest" of the cached table, which involves string representation of the
>>>>>>>>> whole sub tree represented by the cached table.
>>>>>>>>> I don't think Flink currently provides such a mechanism around Table
>>>>>>>>> though.
>>>>>>>>>
>>>>>>>>> 2. How does the CachedTable affect the optimizer? Specifically, will you
>>>>>>>>> have a dedicated QueryOperation for it, will you have
>>>>>>>>> a dedicated logical & physical RelNode for it? And I also don't see a
>>>>>>>>> description about how to work with current optimize phases,
>>>>>>>>> from Operation to Calcite rel node, and then to Flink's logical and
>>>>>>>>> physical node, which will be at last translated to Flink's exec node.
>>>>>>>>> There also exists other optimizations such as dead lock breaker, as well
>>>>>>>> as
>>>>>>>>> sub plan reuse inside the optimizer, I'm not sure whether
>>>>>>>>> the logic dealing with cached tables can be orthogonal to all of these.
>>>>>>>>> Hence I expect you could have a more detailed description here.
>>>>>>>>>
>>>>>>>>> 3. What's the effect of calling TableEnvironment.close()? You already
>>>>>>>>> explained this would drop all caches this table env has,
>>>>>>>>> could you also explain where other functionality still works for this
>>>>>>>> table
>>>>>>>>> env? Like can use still create/drop tables/databases/function
>>>>>>>>> through this table env? What happens to the catalog and all temporary
>>>>>>>>> objects of this table env?
>>>>>>>>>
>>>>>>>>> One minor comment: I noticed you used some not existing API in the
>>>>>>>> examples
>>>>>>>>> you gave, like table.collect(), which is a little
>>>>>>>>> misleading.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Kurt
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jul 9, 2020 at 4:00 PM Xuannan Su <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>>> Hi folks,
>>>>>>>>>>
>>>>>>>>>> I'd like to revive the discussion about FLIP-36 Support Interactive
>>>>>>>>>> Programming in Flink Table API
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>>>>>>>>>>
>>>>>>>>>> The FLIP proposes to add support for interactive programming in Flink
>>>>>>>>>> Table API. Specifically, it let users cache the intermediate
>>>>>>>>>> results(tables) and use them in the later jobs to avoid recomputing the
>>>>>>>>>> intermediate result(tables).
>>>>>>>>>>
>>>>>>>>>> I am looking forward to any opinions and suggestions from the
>>>>>>>> community.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Xuannan
>>>>>>>>>> On May 7, 2020, 5:40 PM +0800, Xuannan Su <[hidden email]>,
>>>>>>>> wrote:
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> There are some feedbacks from @Timo and @Kurt in the voting thread
>>>>>>>> for
>>>>>>>>>> FLIP-36 and I want to share my thoughts here.
>>>>>>>>>>>
>>>>>>>>>>> 1. How would the FLIP-36 look like after FLIP-84?
>>>>>>>>>>> I don't think FLIP-84 will affect FLIP-36 from the public API
>>>>>>>>>> perspective. Users can call .cache on a table object and the cached
>>>>>>>> table
>>>>>>>>>> will be generated whenever the table job is triggered to execute,
>>>>>>>> either by
>>>>>>>>>> Table#executeInsert or StatementSet#execute. I think that FLIP-36
>>>>>>>> should
>>>>>>>>>> aware of the changes made by FLIP-84, but it shouldn't be a problem.
>>>>>>>> At the
>>>>>>>>>> end of the day, FLIP-36 only requires the ability to add a sink to a
>>>>>>>> node,
>>>>>>>>>> submit a table job with multiple sinks, and replace the cached table
>>>>>>>> with a
>>>>>>>>>> source.
>>>>>>>>>>>
>>>>>>>>>>> 2. How can we support cache in a multi-statement SQL file?
>>>>>>>>>>> The most intuitive way to support cache in a multi-statement SQL
>>>>>>>> file is
>>>>>>>>>> by using a view, where the view is corresponding to a cached table.
>>>>>>>>>>>
>>>>>>>>>>> 3. Unifying the cached table and materialized views
>>>>>>>>>>> It is true that the cached table and the materialized view are
>>>>>>>> similar
>>>>>>>>>> in some way. However, I think the materialized view is a more complex
>>>>>>>>>> concept. First, a materialized view requires some kind of a refresh
>>>>>>>>>> mechanism to synchronize with the table. Secondly, the life cycle of a
>>>>>>>>>> materialized view is longer. The materialized view should be accessible
>>>>>>>>>> even after the application exits and should be accessible by another
>>>>>>>>>> application, while the cached table is only accessible in the
>>>>>>>> application
>>>>>>>>>> where it is created. The cached table is introduced to avoid
>>>>>>>> recomputation
>>>>>>>>>> of an intermediate table to support interactive programming in Flink
>>>>>>>> Table
>>>>>>>>>> API. And I think the materialized view needs more discussion and
>>>>>>>> certainly
>>>>>>>>>> deserves a whole new FLIP.
>>>>>>>>>>>
>>>>>>>>>>> Please let me know your thought.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Xuannan
>>>>>>>>>>>
>>>>>>>>>> On Wed, Apr 29, 2020 at 3:53 PM Xuannan Su <[hidden email]>
>>>>>>>> wrote:
>>>>>>>>>>> Hi folks,
>>>>>>>>>>>
>>>>>>>>>>> The FLIP-36 is updated according to the discussion with Becket. In
>>>>>>>> the
>>>>>>>>>> meantime, any comments are very welcome.
>>>>>>>>>>>
>>>>>>>>>>> If there are no further comments, I would like to start the voting
>>>>>>>>>>> thread by tomorrow.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Xuannan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 26, 2020 at 9:34 AM Xuannan Su <[hidden email]>
>>>>>>>>>> wrote:
>>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>>>
>>>>>>>>>>>>> You are right. It makes sense to treat retry of job 2 as an
>>>>>>>> ordinary
>>>>>>>>>> job. And the config does introduce some unnecessary confusion. Thank
>>>>>>>> you
>>>>>>>>>> for you comment. I will update the FLIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Xuannan
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Apr 25, 2020 at 7:44 AM Becket Qin <
>>>>>>>> [hidden email]>
>>>>>>>>>> wrote:
>>>>>>>>>>>>>>> Hi Xuannan,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If user submits Job 1 and generated a cached intermediate
>>>>>>>>>> result. And later
>>>>>>>>>>>>>>> on, user submitted job 2 which should ideally use the
>>>>>>>>>> intermediate result.
>>>>>>>>>>>>>>> In that case, if job 2 failed due to missing the intermediate
>>>>>>>>>> result, Job 2
>>>>>>>>>>>>>>> should be retried with its full DAG. After that when Job 2
>>>>>>>> runs,
>>>>>>>>>> it will
>>>>>>>>>>>>>>> also re-generate the cache. However, once job 2 has fell
>>>>>>>> back to
>>>>>>>>>> the
>>>>>>>>>>>>>>> original DAG, should it just be treated as an ordinary job
>>>>>>>> that
>>>>>>>>>> follow the
>>>>>>>>>>>>>>> recovery strategy? Having a separate configuration seems a
>>>>>>>> little
>>>>>>>>>>>>>>> confusing. In another word, re-generating the cache is just a
>>>>>>>>>> byproduct of
>>>>>>>>>>>>>>> running the full DAG of job 2, but is not the main purpose.
>>>>>>>> It
>>>>>>>>>> is just like
>>>>>>>>>>>>>>> when job 1 runs to generate cache, it does not have a
>>>>>>>> separate
>>>>>>>>>> config of
>>>>>>>>>>>>>>> retry to make sure the cache is generated. If it fails, it
>>>>>>>> just
>>>>>>>>>> fail like
>>>>>>>>>>>>>>> an ordinary job.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> What do you think?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Apr 24, 2020 at 5:00 PM Xuannan Su <
>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The intermediate result will indeed be automatically
>>>>>>>>>> re-generated by
>>>>>>>>>>>>>>>> resubmitting the original DAG. And that job could fail as
>>>>>>>>>> well. In that
>>>>>>>>>>>>>>>> case, we need to decide if we should resubmit the original
>>>>>>>> DAG
>>>>>>>>>> to
>>>>>>>>>>>>>>>> re-generate the intermediate result or give up and throw an
>>>>>>>>>> exception to
>>>>>>>>>>>>>>>> the user. And the config is to indicate how many resubmit
>>>>>>>>>> should happen
>>>>>>>>>>>>>>>> before giving up.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>> Xuannan
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Fri, Apr 24, 2020 at 4:19 PM Becket Qin <
>>>>>>>>>> [hidden email]> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Xuannan,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> I am not entirely sure if I understand the cases you
>>>>>>>>>> mentioned. The
>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>>> can use the cached table object returned by the
>>>>>>>> .cache()
>>>>>>>>>> method in
>>>>>>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>> job and it should read the intermediate result. The
>>>>>>>>>> intermediate result
>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>> gone in the following three cases: 1. the user
>>>>>>>> explicitly
>>>>>>>>>> call the
>>>>>>>>>>>>>>>>>> invalidateCache() method 2. the TableEnvironment is
>>>>>>>> closed
>>>>>>>>>> 3. failure
>>>>>>>>>>>>>>>>>> happens on the TM. When that happens, the intermeidate
>>>>>>>>>> result will not
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>> available unless it is re-generated.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> What confused me was that why do we need to have a
>>>>>>>>>> *cache.retries.max
>>>>>>>>>>>>>>>>> *config?
>>>>>>>>>>>>>>>>> Shouldn't the missing intermediate result always be
>>>>>>>>>> automatically
>>>>>>>>>>>>>>>>> re-generated if it is gone?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Fri, Apr 24, 2020 at 3:59 PM Xuannan Su <
>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Hi Becket,
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Thanks for the comments.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Fri, Apr 24, 2020 at 9:12 AM Becket Qin <
>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Hi Xuannan,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks for picking up the FLIP. It looks good to me
>>>>>>>>>> overall. Some
>>>>>>>>>>>>>>>> quick
>>>>>>>>>>>>>>>>>>> comments / questions below:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 1. Do we also need changes in the Java API?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Yes, the public interface of Table and TableEnvironment
>>>>>>>>>> should be made
>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>> the Java API.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 2. What are the cases that users may want to retry
>>>>>>>>>> reading the
>>>>>>>>>>>>>>>>>> intermediate
>>>>>>>>>>>>>>>>>>> result? It seems that once the intermediate result
>>>>>>>> has
>>>>>>>>>> gone, it will
>>>>>>>>>>>>>>>>> not
>>>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>>> available later without being generated again, right?
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> I am not entirely sure if I understand the cases you
>>>>>>>>>> mentioned. The
>>>>>>>>>>>>>>>>> users
>>>>>>>>>>>>>>>>>> can use the cached table object returned by the
>>>>>>>> .cache()
>>>>>>>>>> method in
>>>>>>>>>>>>>>>> other
>>>>>>>>>>>>>>>>>> job and it should read the intermediate result. The
>>>>>>>>>> intermediate result
>>>>>>>>>>>>>>>>> can
>>>>>>>>>>>>>>>>>> gone in the following three cases: 1. the user
>>>>>>>> explicitly
>>>>>>>>>> call the
>>>>>>>>>>>>>>>>>> invalidateCache() method 2. the TableEnvironment is
>>>>>>>> closed
>>>>>>>>>> 3. failure
>>>>>>>>>>>>>>>>>> happens on the TM. When that happens, the intermeidate
>>>>>>>>>> result will not
>>>>>>>>>>>>>>>> be
>>>>>>>>>>>>>>>>>> available unless it is re-generated.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> 3. In the "semantic of cache() method" section, the
>>>>>>>>>> description "The
>>>>>>>>>>>>>>>>>>> semantic of the *cache() *method is a little
>>>>>>>> different
>>>>>>>>>> depending on
>>>>>>>>>>>>>>>>>> whether
>>>>>>>>>>>>>>>>>>> auto caching is enabled or not." seems not explained.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This line is actually outdated and should be removed,
>>>>>>>> as
>>>>>>>>>> we are not
>>>>>>>>>>>>>>>>> adding
>>>>>>>>>>>>>>>>>> the auto caching functionality in this FLIP. Auto
>>>>>>>> caching
>>>>>>>>>> will be added
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>> the future, and the semantic of cache() when auto
>>>>>>>> caching
>>>>>>>>>> is enabled
>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>>>> be discussed in detail by a new FLIP. I will remove the
>>>>>>>>>> descriptor to
>>>>>>>>>>>>>>>>> avoid
>>>>>>>>>>>>>>>>>> further confusion.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Jiangjie (Becket) Qin
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Apr 22, 2020 at 4:00 PM Xuannan Su <
>>>>>>>>>> [hidden email]>
>>>>>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Hi folks,
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> I'd like to start the discussion about FLIP-36
>>>>>>>> Support
>>>>>>>>>> Interactive
>>>>>>>>>>>>>>>>>>>> Programming in Flink Table API
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-36%3A+Support+Interactive+Programming+in+Flink
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> The FLIP proposes to add support for interactive
>>>>>>>>>> programming in
>>>>>>>>>>>>>>>> Flink
>>>>>>>>>>>>>>>>>>> Table
>>>>>>>>>>>>>>>>>>>> API. Specifically, it let users cache the
>>>>>>>> intermediate
>>>>>>>>>>>>>>>>> results(tables)
>>>>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>>>>>> use them in the later jobs.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Even though the FLIP has been discussed in the
>>>>>>>>>> past[1], the FLIP
>>>>>>>>>>>>>>>>> hasn't
>>>>>>>>>>>>>>>>>>>> formally passed the vote yet. And some of the
>>>>>>>> design
>>>>>>>>>> and
>>>>>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>>>>>> detail have to change to incorporates the cluster
>>>>>>>>>> partition
>>>>>>>>>>>>>>>> proposed
>>>>>>>>>>>>>>>>> in
>>>>>>>>>>>>>>>>>>>> FLIP-67[2].
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Looking forward to your feedback.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>>>>>>> Xuannan
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-67%3A+Cluster+partitions+lifecycle
>>>>>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>> https://lists.apache.org/thread.html/b372fd7b962b9f37e4dace3bc8828f6e2a2b855e56984e58bc4a413f@%3Cdev.flink.apache.org%3E
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Aljoscha Krettek-2
In reply to this post by Xuannan Su
On 10.09.20 09:00, Xuannan Su wrote:
>> How do you imagine that? Where do you distinguish between per-job and
>> session mode?
> The StreamExecutionEnvironment can distinguish between per-job and session mode by the type of the PipelineExecutor, i.e, AbstractJobClusterExecutor vs AbstractSessionClusterExecutor.

I can just comment on this last part but we should not to instanceof
checks on the PipelineExecutor. The PipelineExecutor is an interface on
purpose and the execution environments should not try and guess
knowledge about the executor implementations. This would introduce tight
coupling which might break in the future if the executors were to change.

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi Aljoscha,

Thanks for your comment. I agree that we should not introduce tight coupling with PipelineExecutor to the execution environment. With that in mind, to distinguish the per-job and session mode, we can introduce a new method, naming isPerJobModeExecutor, in the PipelineExecutorFactory or PipelineExecutor so that the execution environment can recognize per-job mode without instanced checks on the PipelineExecutor. What do you think? Any thoughts or suggestions are very welcome.

Best,
Xuannan
On Sep 10, 2020, 4:51 PM +0800, Aljoscha Krettek <[hidden email]>, wrote:

> On 10.09.20 09:00, Xuannan Su wrote:
> > > How do you imagine that? Where do you distinguish between per-job and
> > > session mode?
> > The StreamExecutionEnvironment can distinguish between per-job and session mode by the type of the PipelineExecutor, i.e, AbstractJobClusterExecutor vs AbstractSessionClusterExecutor.
>
> I can just comment on this last part but we should not to instanceof
> checks on the PipelineExecutor. The PipelineExecutor is an interface on
> purpose and the execution environments should not try and guess
> knowledge about the executor implementations. This would introduce tight
> coupling which might break in the future if the executors were to change.
>
> Best,
> Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Aljoscha Krettek-2
On 15.09.20 07:00, Xuannan Su wrote:
> Thanks for your comment. I agree that we should not introduce tight coupling with PipelineExecutor to the execution environment. With that in mind, to distinguish the per-job and session mode, we can introduce a new method, naming isPerJobModeExecutor, in the PipelineExecutorFactory or PipelineExecutor so that the execution environment can recognize per-job mode without instanced checks on the PipelineExecutor. What do you think? Any thoughts or suggestions are very welcome.

I think this would just sidestep the problem. Can't we just ignore it?
With per-job mode the cache will not be there and the program will fall
back to re-execute the whole graph. That shouldn't be a problem.

Best,
Aljoscha

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi Aljoscha,

I thought about relying on the failover mechanism to re-execute the whole graph when the cache doesn’t exist. The only concern I have is that every job that uses the cache table in the per-job cluster will have to go through the following process,
job submit -> job fail because of the intermediate result doesn’t exist -> fall back to origin DAG -> job submit
, which might not be ideal.
One way of solving this is to let the CatalogManager probe the existence of the IntermediateResult so that the planner can decide if the cache table should be used.

Best,
Xuannan
On Sep 15, 2020, 3:28 PM +0800, Aljoscha Krettek <[hidden email]>, wrote:

> On 15.09.20 07:00, Xuannan Su wrote:
> > Thanks for your comment. I agree that we should not introduce tight coupling with PipelineExecutor to the execution environment. With that in mind, to distinguish the per-job and session mode, we can introduce a new method, naming isPerJobModeExecutor, in the PipelineExecutorFactory or PipelineExecutor so that the execution environment can recognize per-job mode without instanced checks on the PipelineExecutor. What do you think? Any thoughts or suggestions are very welcome.
>
> I think this would just sidestep the problem. Can't we just ignore it?
> With per-job mode the cache will not be there and the program will fall
> back to re-execute the whole graph. That shouldn't be a problem.
>
> Best,
> Aljoscha
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Aljoscha Krettek-2
On 15.09.20 10:54, Xuannan Su wrote:
> One way of solving this is to let the CatalogManager probe the existence of the IntermediateResult so that the planner can decide if the cache table should be used.

That could be a reasonable solution, yes.

Best,
Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-36 - Support Interactive Programming in Flink Table API

Xuannan Su
Hi everyone,

@Aljoscha, I have updated the Per-Job mode Section of the FLIP.

It seems that people involved in the discussion have reach a consensus. If there are no more comments, I would like to start the voting thread tomorrow.

Best,
Xuannan
On Sep 15, 2020, 6:18 PM +0800, Aljoscha Krettek <[hidden email]>, wrote:
> On 15.09.20 10:54, Xuannan Su wrote:
> > One way of solving this is to let the CatalogManager probe the existence of the IntermediateResult so that the planner can decide if the cache table should be used.
>
> That could be a reasonable solution, yes.
>
> Best,
> Aljoscha
12