Hi folks,
I would like to start the FLIP discussion thread about the pluggable intermediate result storage. This is phase 2 of FLIP-36: Support Interactive Programming in Flink Skip to end of metadata. While the FLIP-36 provides a default implementation of the intermediate result storage using the shuffle service, we would like to make the intermediate result storage pluggable so that the user can easily swap the storage. We are looking forward to your thought! The FLIP link is the following: https://cwiki.apache.org/confluence/display/FLINK/FLIP-48%3A+Pluggable+Intermediate+Result+Storage <https://cwiki.apache.org/confluence/display/FLINK/FLIP-48:+Pluggable+Intermediate+Result+Storage> Best, Xuannan |
Sorry for the late response. So many FLIPs these days.
I am a bit unsure about the motivation here, and that this need to be a part of Flink. It sounds like this can be perfectly built around Flink as a minimal library on top of it, without any change in the core APIs or runtime. The proposal to handle "caching intermediate results" (to make them reusable across jobs in a session), and "writing them in different formats / indexing them" doesn't sound like it should be the same mechanism. - The caching part is a transparent low-level primitive. It avoid re-executing a part of the job graph, but otherwise is completely transparent to the consumer job. - Writing data out in a sink, compressing/indexing it and then reading it in another job is also a way of reusing a previous result, but on a completely different abstraction level. It is not the same intermediate result any more. When the consumer reads from it and applies predicate pushdown, etc. then the consumer job looks completely different from a job that consumed the original result. It hence needs to be solved on the API level via a sink and a source. I would suggest to keep these concepts separate: Caching (possibly automatically) for jobs in a session, and long term writing/sharing of data sets. Solving the "long term writing/sharing" in a library rather than in the runtime also has the advantage of not pushing yet more stuff into Flink's core, which I believe is also an important criterion. Best, Stephan On Thu, Jul 25, 2019 at 4:53 AM Xuannan Su <[hidden email]> wrote: > Hi folks, > > I would like to start the FLIP discussion thread about the pluggable > intermediate result storage. > > This is phase 2 of FLIP-36: Support Interactive Programming in Flink Skip > to end of metadata. While the FLIP-36 provides a default implementation of > the intermediate result storage using the shuffle service, we would like to > make the intermediate result storage pluggable so that the user can easily > swap the storage. > > We are looking forward to your thought! > > The FLIP link is the following: > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-48%3A+Pluggable+Intermediate+Result+Storage > < > https://cwiki.apache.org/confluence/display/FLINK/FLIP-48:+Pluggable+Intermediate+Result+Storage > > > > Best, > Xuannan > |
Hi Stephan,
Sorry for the belated reply. You are right that the functionality proposed in this FLIP can be implemented out of the Flink core as an ecosystem project. The main motivation of this FLIP is two folds: 1. Improve the performance of intermediate result sharing in the same session. Using the internal shuffle service to store cached result has two potential performance problems. a) the cached intermediate results may break the operator chaining due to the addition of BLOCKING_PERSISTENT edge. b) the downstream processor must read all the records in intermediate results to process. A pluggable intermediate result storage will help address both of the problem. Adding a sink will not break chaining, but just ensure cached logical node will not be optimized away. The pluggable storage can help improve the performance by making the intermediate results filterable / projectable, etc. Alternatively we can make the shuffle service more sophisticated, but it may complicate things and is not necessary for the normal shuffles. This motivation seems difficult to be addressed as an external library on top of Flink core, mainly because the in-session intermediate result cleanup may need participation of RM to achieve fault tolerance. Also, having an external library essentially introduces another way to cache the in-session intermediate results. 2. Cross session intermediate result sharing. As you said, this can be implemented as an external library. The only difference is that users may need to deal with another set of API, but that seems OK. So for this FLIP, it would be good to see whether we think motivation 1 is worth addressing or not. What do you think? Thanks, Jiangjie (Becket) Qin On Thu, Aug 15, 2019 at 11:42 PM Stephan Ewen <[hidden email]> wrote: > Sorry for the late response. So many FLIPs these days. > > I am a bit unsure about the motivation here, and that this need to be a > part of Flink. It sounds like this can be perfectly built around Flink as a > minimal library on top of it, without any change in the core APIs or > runtime. > > The proposal to handle "caching intermediate results" (to make them > reusable across jobs in a session), and "writing them in different formats > / indexing them" doesn't sound like it should be the same mechanism. > > - The caching part is a transparent low-level primitive. It avoid > re-executing a part of the job graph, but otherwise is completely > transparent to the consumer job. > > - Writing data out in a sink, compressing/indexing it and then reading it > in another job is also a way of reusing a previous result, but on a > completely different abstraction level. It is not the same intermediate > result any more. When the consumer reads from it and applies predicate > pushdown, etc. then the consumer job looks completely different from a job > that consumed the original result. It hence needs to be solved on the API > level via a sink and a source. > > I would suggest to keep these concepts separate: Caching (possibly > automatically) for jobs in a session, and long term writing/sharing of data > sets. > > Solving the "long term writing/sharing" in a library rather than in the > runtime also has the advantage of not pushing yet more stuff into Flink's > core, which I believe is also an important criterion. > > Best, > Stephan > > > On Thu, Jul 25, 2019 at 4:53 AM Xuannan Su <[hidden email]> wrote: > > > Hi folks, > > > > I would like to start the FLIP discussion thread about the pluggable > > intermediate result storage. > > > > This is phase 2 of FLIP-36: Support Interactive Programming in Flink Skip > > to end of metadata. While the FLIP-36 provides a default implementation > of > > the intermediate result storage using the shuffle service, we would like > to > > make the intermediate result storage pluggable so that the user can > easily > > swap the storage. > > > > We are looking forward to your thought! > > > > The FLIP link is the following: > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-48%3A+Pluggable+Intermediate+Result+Storage > > < > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-48:+Pluggable+Intermediate+Result+Storage > > > > > > > Best, > > Xuannan > > > |
## About the improvements you mentioned in (1)
- I am not sure that this helps to improve performance by avoiding to break the pipeline. Attaching an additional sink, would in virtually any case add even more overhead than the pipeline breaking. What is your reasoning why it would be faster, all in all? - About reading only a subset of the records: - If this is about reading the data once or twice, then columnarizing/indexing/compressing the data is more expensive than just reading it twice more. - This means turning the mechanism into something like materialized view matching, rather than result caching. That should happen in different parts of the stack (view matching needs schema, semantics, etc.). I am not sure mixing both is even a good idea. ## The way I see the trade-offs are: Pro in core Flink: - Small improvement to API experience, compared to a library Contra in core Flink: - added API complexity, maintenance and evolution overhead - not clear what impacts mixing materialized view matching and result caching has on the system architecture - Not yet a frequent use case, possibly a frequent use case in the future. - Starting as a library allows for merging into the core later when this use case becomes major and experience improvement proves big. Unclear - is breaking the pipeline by introducing a blocking intermediate result really worse than duplicating the data into an additional sink? ==> Especially because so we can still make it part of Flink later once the use case and approach are a bit more fleshed out, this looks like a strong case for starting with a library approach here. Best, Stephan On Thu, Sep 19, 2019 at 2:41 AM Becket Qin <[hidden email]> wrote: > Hi Stephan, > > Sorry for the belated reply. You are right that the functionality proposed > in this FLIP can be implemented out of the Flink core as an ecosystem > project. > > The main motivation of this FLIP is two folds: > > 1. Improve the performance of intermediate result sharing in the same > session. > Using the internal shuffle service to store cached result has two potential > performance problems. > a) the cached intermediate results may break the operator chaining due to > the addition of BLOCKING_PERSISTENT edge. > b) the downstream processor must read all the records in intermediate > results to process. > > A pluggable intermediate result storage will help address both of the > problem. Adding a sink will not break chaining, but just ensure cached > logical node will not be optimized away. The pluggable storage can help > improve the performance by making the intermediate results filterable / > projectable, etc. Alternatively we can make the shuffle service more > sophisticated, but it may complicate things and is not necessary for the > normal shuffles. > > This motivation seems difficult to be addressed as an external library on > top of Flink core, mainly because the in-session intermediate result > cleanup may need participation of RM to achieve fault tolerance. Also, > having an external library essentially introduces another way to cache the > in-session intermediate results. > > 2. Cross session intermediate result sharing. > As you said, this can be implemented as an external library. The only > difference is that users may need to deal with another set of API, but that > seems OK. > > > So for this FLIP, it would be good to see whether we think motivation 1 is > worth addressing or not. > > What do you think? > > Thanks, > > Jiangjie (Becket) Qin > > On Thu, Aug 15, 2019 at 11:42 PM Stephan Ewen <[hidden email]> wrote: > > > Sorry for the late response. So many FLIPs these days. > > > > I am a bit unsure about the motivation here, and that this need to be a > > part of Flink. It sounds like this can be perfectly built around Flink > as a > > minimal library on top of it, without any change in the core APIs or > > runtime. > > > > The proposal to handle "caching intermediate results" (to make them > > reusable across jobs in a session), and "writing them in different > formats > > / indexing them" doesn't sound like it should be the same mechanism. > > > > - The caching part is a transparent low-level primitive. It avoid > > re-executing a part of the job graph, but otherwise is completely > > transparent to the consumer job. > > > > - Writing data out in a sink, compressing/indexing it and then reading > it > > in another job is also a way of reusing a previous result, but on a > > completely different abstraction level. It is not the same intermediate > > result any more. When the consumer reads from it and applies predicate > > pushdown, etc. then the consumer job looks completely different from a > job > > that consumed the original result. It hence needs to be solved on the API > > level via a sink and a source. > > > > I would suggest to keep these concepts separate: Caching (possibly > > automatically) for jobs in a session, and long term writing/sharing of > data > > sets. > > > > Solving the "long term writing/sharing" in a library rather than in the > > runtime also has the advantage of not pushing yet more stuff into Flink's > > core, which I believe is also an important criterion. > > > > Best, > > Stephan > > > > > > On Thu, Jul 25, 2019 at 4:53 AM Xuannan Su <[hidden email]> > wrote: > > > > > Hi folks, > > > > > > I would like to start the FLIP discussion thread about the pluggable > > > intermediate result storage. > > > > > > This is phase 2 of FLIP-36: Support Interactive Programming in Flink > Skip > > > to end of metadata. While the FLIP-36 provides a default implementation > > of > > > the intermediate result storage using the shuffle service, we would > like > > to > > > make the intermediate result storage pluggable so that the user can > > easily > > > swap the storage. > > > > > > We are looking forward to your thought! > > > > > > The FLIP link is the following: > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-48%3A+Pluggable+Intermediate+Result+Storage > > > < > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-48:+Pluggable+Intermediate+Result+Storage > > > > > > > > > > Best, > > > Xuannan > > > > > > |
Hi Stephan,
In terms of the performance concern, please see my understanding below. ## Breaking the pipeline v.s. adding a sink. If two operators are initially chained, they will belong to the same stage in the DAG and the same task, therefore the main processing path will just have one task without serde in the middle. I was trying to see the overhead of adding a new sink or breaking the pipeline. - Adding a new sink introduces serialization cost, and potentially network IO if the sink writes to a remote storage instead of local file system. - Breaking the pipeline introduces a new stage, a new task, additional serialization / deserialization cost and potential network IO. Therefore I thought that adding a new sink will have better performance than breaking the pipeline because it has lower cost in general. Please let me know if I missed something. The above scenarios assume that users want to cache the result in the middle of an operator chain, but not at the shuffle boundary. If the cache is at the shuffle boundary, it would duplicate the records unless the pluggable shuffle service is also the pluggable intermediate result storage at the same time. In that case, there will be just one copy of the records, but could be read by either the pluggable shuffle service or the pluggable intermediate result storage. ## Reading a subset of record You are right. Any additional indexing / compression / columnizing of the raw intermediate result introduces overhead. So it only makes sense if the saving is greater than the overhead. One such example is iteration. In that case, the cached intermediate results may be read for some undefined times and the initial overhead of columnizing would be worthwhile. In general, I am with you that this could be put in an external library. It is achievable if we only address the cross-session intermediate result sharing. However, an external library is not sufficient to provide optimized in-session intermediate result sharing. This is mainly because when the job exits, RM needs to clean up the intermediate results. So basically we are choosing between the following two options. Option 1: in-session sharing is only served by shuffle service, no special performance optimization. Option 2: In-session sharing is served by shuffle service by default, performance optimization can be provided by pluggable intermediate result storage. It would be helpful for us to first agree on whether we want to have performance optimization for in-session intermediate result sharing? If not, option 1 is good enough. Otherwise, we would need something pluggable for the in-session intermediate result. Thoughts? Thanks, Jiangjie (Becket) Qin On Sun, Sep 22, 2019 at 8:44 PM Stephan Ewen <[hidden email]> wrote: > ## About the improvements you mentioned in (1) > > - I am not sure that this helps to improve performance by avoiding to > break the pipeline. > Attaching an additional sink, would in virtually any case add even more > overhead than the pipeline breaking. > What is your reasoning why it would be faster, all in all? > > - About reading only a subset of the records: > - If this is about reading the data once or twice, then > columnarizing/indexing/compressing the data is more expensive than just > reading it twice more. > - This means turning the mechanism into something like materialized > view matching, rather than result caching. That should happen in different > parts of the stack (view matching needs schema, semantics, etc.). I am not > sure mixing both is even a good idea. > > > ## The way I see the trade-offs are: > > Pro in core Flink: > - Small improvement to API experience, compared to a library > > Contra in core Flink: > - added API complexity, maintenance and evolution overhead > - not clear what impacts mixing materialized view matching and result > caching has on the system architecture > - Not yet a frequent use case, possibly a frequent use case in the > future. > - Starting as a library allows for merging into the core later when this > use case becomes major and experience improvement proves big. > > Unclear > - is breaking the pipeline by introducing a blocking intermediate result > really worse than duplicating the data into an additional sink? > > > ==> Especially because so we can still make it part of Flink later once the > use case and approach are a bit more fleshed out, this looks like a strong > case for starting with a library approach here. > > Best, > Stephan > > > > On Thu, Sep 19, 2019 at 2:41 AM Becket Qin <[hidden email]> wrote: > > > Hi Stephan, > > > > Sorry for the belated reply. You are right that the functionality > proposed > > in this FLIP can be implemented out of the Flink core as an ecosystem > > project. > > > > The main motivation of this FLIP is two folds: > > > > 1. Improve the performance of intermediate result sharing in the same > > session. > > Using the internal shuffle service to store cached result has two > potential > > performance problems. > > a) the cached intermediate results may break the operator chaining due > to > > the addition of BLOCKING_PERSISTENT edge. > > b) the downstream processor must read all the records in intermediate > > results to process. > > > > A pluggable intermediate result storage will help address both of the > > problem. Adding a sink will not break chaining, but just ensure cached > > logical node will not be optimized away. The pluggable storage can help > > improve the performance by making the intermediate results filterable / > > projectable, etc. Alternatively we can make the shuffle service more > > sophisticated, but it may complicate things and is not necessary for the > > normal shuffles. > > > > This motivation seems difficult to be addressed as an external library on > > top of Flink core, mainly because the in-session intermediate result > > cleanup may need participation of RM to achieve fault tolerance. Also, > > having an external library essentially introduces another way to cache > the > > in-session intermediate results. > > > > 2. Cross session intermediate result sharing. > > As you said, this can be implemented as an external library. The only > > difference is that users may need to deal with another set of API, but > that > > seems OK. > > > > > > So for this FLIP, it would be good to see whether we think motivation 1 > is > > worth addressing or not. > > > > What do you think? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > On Thu, Aug 15, 2019 at 11:42 PM Stephan Ewen <[hidden email]> wrote: > > > > > Sorry for the late response. So many FLIPs these days. > > > > > > I am a bit unsure about the motivation here, and that this need to be a > > > part of Flink. It sounds like this can be perfectly built around Flink > > as a > > > minimal library on top of it, without any change in the core APIs or > > > runtime. > > > > > > The proposal to handle "caching intermediate results" (to make them > > > reusable across jobs in a session), and "writing them in different > > formats > > > / indexing them" doesn't sound like it should be the same mechanism. > > > > > > - The caching part is a transparent low-level primitive. It avoid > > > re-executing a part of the job graph, but otherwise is completely > > > transparent to the consumer job. > > > > > > - Writing data out in a sink, compressing/indexing it and then > reading > > it > > > in another job is also a way of reusing a previous result, but on a > > > completely different abstraction level. It is not the same intermediate > > > result any more. When the consumer reads from it and applies predicate > > > pushdown, etc. then the consumer job looks completely different from a > > job > > > that consumed the original result. It hence needs to be solved on the > API > > > level via a sink and a source. > > > > > > I would suggest to keep these concepts separate: Caching (possibly > > > automatically) for jobs in a session, and long term writing/sharing of > > data > > > sets. > > > > > > Solving the "long term writing/sharing" in a library rather than in the > > > runtime also has the advantage of not pushing yet more stuff into > Flink's > > > core, which I believe is also an important criterion. > > > > > > Best, > > > Stephan > > > > > > > > > On Thu, Jul 25, 2019 at 4:53 AM Xuannan Su <[hidden email]> > > wrote: > > > > > > > Hi folks, > > > > > > > > I would like to start the FLIP discussion thread about the pluggable > > > > intermediate result storage. > > > > > > > > This is phase 2 of FLIP-36: Support Interactive Programming in Flink > > Skip > > > > to end of metadata. While the FLIP-36 provides a default > implementation > > > of > > > > the intermediate result storage using the shuffle service, we would > > like > > > to > > > > make the intermediate result storage pluggable so that the user can > > > easily > > > > swap the storage. > > > > > > > > We are looking forward to your thought! > > > > > > > > The FLIP link is the following: > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-48%3A+Pluggable+Intermediate+Result+Storage > > > > < > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-48:+Pluggable+Intermediate+Result+Storage > > > > > > > > > > > > > Best, > > > > Xuannan > > > > > > > > > > |
I understand your argument about performance, though I am unsure it will be
that noticeable and warrants the added complexity, Besides performance, I believe the other two arguments in the previous mails are quite important to consider: - Keeping complexity out of the core is a good goal. It is easy to start with a library and make a deeper integration later when we see the need and that there is really something missing in the user experience. - The mechanism you describe is closer to "view matching" and only applicable in the Table API with an optimizer that understands the semantics, whereas result caching is more low-level and works in the DataSet / DataStream API as well. That distinction suggests it is not exactly the same mechanism anyways (through view matching can be partiall built on top of intermediate results). Best, Stephan On Tue, Sep 24, 2019 at 2:01 AM Becket Qin <[hidden email]> wrote: > Hi Stephan, > > In terms of the performance concern, please see my understanding below. > > ## Breaking the pipeline v.s. adding a sink. > If two operators are initially chained, they will belong to the same stage > in the DAG and the same task, therefore the main processing path will just > have one task without serde in the middle. I was trying to see the overhead > of adding a new sink or breaking the pipeline. > > - Adding a new sink introduces serialization cost, and potentially > network IO if the sink writes to a remote storage instead of local file > system. > - Breaking the pipeline introduces a new stage, a new task, additional > serialization / deserialization cost and potential network IO. > > Therefore I thought that adding a new sink will have better performance > than breaking the pipeline because it has lower cost in general. > Please let me know if I missed something. > > The above scenarios assume that users want to cache the result in the > middle of an operator chain, but not at the shuffle boundary. If the cache > is at the shuffle boundary, it would duplicate the records unless the > pluggable shuffle service is also the pluggable intermediate result storage > at the same time. In that case, there will be just one copy of the records, > but could be read by either the pluggable shuffle service or the pluggable > intermediate result storage. > > ## Reading a subset of record > You are right. Any additional indexing / compression / columnizing of the > raw intermediate result introduces overhead. So it only makes sense if the > saving is greater than the overhead. One such example is iteration. In that > case, the cached intermediate results may be read for some undefined times > and the initial overhead of columnizing would be worthwhile. > > > In general, I am with you that this could be put in an external library. It > is achievable if we only address the cross-session intermediate result > sharing. However, an external library is not sufficient to provide > optimized in-session intermediate result sharing. This is mainly because > when the job exits, RM needs to clean up the intermediate results. So > basically we are choosing between the following two options. > > Option 1: in-session sharing is only served by shuffle service, no special > performance optimization. > Option 2: In-session sharing is served by shuffle service by default, > performance optimization can be provided by pluggable intermediate result > storage. > > It would be helpful for us to first agree on whether we want to have > performance optimization for in-session intermediate result sharing? If > not, option 1 is good enough. Otherwise, we would need something pluggable > for the in-session intermediate result. > > Thoughts? > > Thanks, > > Jiangjie (Becket) Qin > > > > > On Sun, Sep 22, 2019 at 8:44 PM Stephan Ewen <[hidden email]> wrote: > > > ## About the improvements you mentioned in (1) > > > > - I am not sure that this helps to improve performance by avoiding to > > break the pipeline. > > Attaching an additional sink, would in virtually any case add even > more > > overhead than the pipeline breaking. > > What is your reasoning why it would be faster, all in all? > > > > - About reading only a subset of the records: > > - If this is about reading the data once or twice, then > > columnarizing/indexing/compressing the data is more expensive than just > > reading it twice more. > > - This means turning the mechanism into something like materialized > > view matching, rather than result caching. That should happen in > different > > parts of the stack (view matching needs schema, semantics, etc.). I am > not > > sure mixing both is even a good idea. > > > > > > ## The way I see the trade-offs are: > > > > Pro in core Flink: > > - Small improvement to API experience, compared to a library > > > > Contra in core Flink: > > - added API complexity, maintenance and evolution overhead > > - not clear what impacts mixing materialized view matching and result > > caching has on the system architecture > > - Not yet a frequent use case, possibly a frequent use case in the > > future. > > - Starting as a library allows for merging into the core later when > this > > use case becomes major and experience improvement proves big. > > > > Unclear > > - is breaking the pipeline by introducing a blocking intermediate > result > > really worse than duplicating the data into an additional sink? > > > > > > ==> Especially because so we can still make it part of Flink later once > the > > use case and approach are a bit more fleshed out, this looks like a > strong > > case for starting with a library approach here. > > > > Best, > > Stephan > > > > > > > > On Thu, Sep 19, 2019 at 2:41 AM Becket Qin <[hidden email]> wrote: > > > > > Hi Stephan, > > > > > > Sorry for the belated reply. You are right that the functionality > > proposed > > > in this FLIP can be implemented out of the Flink core as an ecosystem > > > project. > > > > > > The main motivation of this FLIP is two folds: > > > > > > 1. Improve the performance of intermediate result sharing in the same > > > session. > > > Using the internal shuffle service to store cached result has two > > potential > > > performance problems. > > > a) the cached intermediate results may break the operator chaining > due > > to > > > the addition of BLOCKING_PERSISTENT edge. > > > b) the downstream processor must read all the records in intermediate > > > results to process. > > > > > > A pluggable intermediate result storage will help address both of the > > > problem. Adding a sink will not break chaining, but just ensure cached > > > logical node will not be optimized away. The pluggable storage can help > > > improve the performance by making the intermediate results filterable / > > > projectable, etc. Alternatively we can make the shuffle service more > > > sophisticated, but it may complicate things and is not necessary for > the > > > normal shuffles. > > > > > > This motivation seems difficult to be addressed as an external library > on > > > top of Flink core, mainly because the in-session intermediate result > > > cleanup may need participation of RM to achieve fault tolerance. Also, > > > having an external library essentially introduces another way to cache > > the > > > in-session intermediate results. > > > > > > 2. Cross session intermediate result sharing. > > > As you said, this can be implemented as an external library. The only > > > difference is that users may need to deal with another set of API, but > > that > > > seems OK. > > > > > > > > > So for this FLIP, it would be good to see whether we think motivation 1 > > is > > > worth addressing or not. > > > > > > What do you think? > > > > > > Thanks, > > > > > > Jiangjie (Becket) Qin > > > > > > On Thu, Aug 15, 2019 at 11:42 PM Stephan Ewen <[hidden email]> > wrote: > > > > > > > Sorry for the late response. So many FLIPs these days. > > > > > > > > I am a bit unsure about the motivation here, and that this need to > be a > > > > part of Flink. It sounds like this can be perfectly built around > Flink > > > as a > > > > minimal library on top of it, without any change in the core APIs or > > > > runtime. > > > > > > > > The proposal to handle "caching intermediate results" (to make them > > > > reusable across jobs in a session), and "writing them in different > > > formats > > > > / indexing them" doesn't sound like it should be the same mechanism. > > > > > > > > - The caching part is a transparent low-level primitive. It avoid > > > > re-executing a part of the job graph, but otherwise is completely > > > > transparent to the consumer job. > > > > > > > > - Writing data out in a sink, compressing/indexing it and then > > reading > > > it > > > > in another job is also a way of reusing a previous result, but on a > > > > completely different abstraction level. It is not the same > intermediate > > > > result any more. When the consumer reads from it and applies > predicate > > > > pushdown, etc. then the consumer job looks completely different from > a > > > job > > > > that consumed the original result. It hence needs to be solved on the > > API > > > > level via a sink and a source. > > > > > > > > I would suggest to keep these concepts separate: Caching (possibly > > > > automatically) for jobs in a session, and long term writing/sharing > of > > > data > > > > sets. > > > > > > > > Solving the "long term writing/sharing" in a library rather than in > the > > > > runtime also has the advantage of not pushing yet more stuff into > > Flink's > > > > core, which I believe is also an important criterion. > > > > > > > > Best, > > > > Stephan > > > > > > > > > > > > On Thu, Jul 25, 2019 at 4:53 AM Xuannan Su <[hidden email]> > > > wrote: > > > > > > > > > Hi folks, > > > > > > > > > > I would like to start the FLIP discussion thread about the > pluggable > > > > > intermediate result storage. > > > > > > > > > > This is phase 2 of FLIP-36: Support Interactive Programming in > Flink > > > Skip > > > > > to end of metadata. While the FLIP-36 provides a default > > implementation > > > > of > > > > > the intermediate result storage using the shuffle service, we would > > > like > > > > to > > > > > make the intermediate result storage pluggable so that the user can > > > > easily > > > > > swap the storage. > > > > > > > > > > We are looking forward to your thought! > > > > > > > > > > The FLIP link is the following: > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-48%3A+Pluggable+Intermediate+Result+Storage > > > > > < > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-48:+Pluggable+Intermediate+Result+Storage > > > > > > > > > > > > > > > > Best, > > > > > Xuannan > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |