Hey Everyone!
Gordon and I have been discussing adding a savepoint connector to flink for reading, writing and modifying savepoints. This is useful for: Analyzing state for interesting patterns Troubleshooting or auditing jobs by checking for discrepancies in state Bootstrapping state for new applications Modifying savepoints such as: Changing max parallelism Making breaking schema changes Correcting invalid state We are looking forward to your feedback! This is the FLIP: https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector Seth |
Hi Seth,
Glad to see this FLIP, big +1 for this feature! Best, Vino Seth Wiesman <[hidden email]> 于2019年5月30日周四 上午7:14写道: > Hey Everyone! > > Gordon and I have been discussing adding a savepoint connector to flink > for reading, writing and modifying savepoints. > > This is useful for: > > Analyzing state for interesting patterns > Troubleshooting or auditing jobs by checking for discrepancies in state > Bootstrapping state for new applications > Modifying savepoints such as: > Changing max parallelism > Making breaking schema changes > Correcting invalid state > > We are looking forward to your feedback! > > This is the FLIP: > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector > > Seth > > > |
Hi Seth,
+1 from my side. I was wondering if we can add a reader method to provide a full view of the states instead of the state of a specific operator? It would be helpful when there is some unrestored states of a previously removed operator in the savepoint. Best, Paul Lam > 在 2019年5月30日,09:55,vino yang <[hidden email]> 写道: > > Hi Seth, > > Glad to see this FLIP, big +1 for this feature! > > Best, > Vino > > Seth Wiesman <[hidden email]> 于2019年5月30日周四 上午7:14写道: > >> Hey Everyone! >> >> Gordon and I have been discussing adding a savepoint connector to flink >> for reading, writing and modifying savepoints. >> >> This is useful for: >> >> Analyzing state for interesting patterns >> Troubleshooting or auditing jobs by checking for discrepancies in state >> Bootstrapping state for new applications >> Modifying savepoints such as: >> Changing max parallelism >> Making breaking schema changes >> Correcting invalid state >> >> We are looking forward to your feedback! >> >> This is the FLIP: >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >> >> Seth >> >> >> |
Hi Paul,
I’m not following, could you provide and example of the kind of operation your describing? Seth > On May 29, 2019, at 7:37 PM, Paul Lam <[hidden email]> wrote: > > Hi Seth, > > +1 from my side. > > I was wondering if we can add a reader method to provide a full view of the states instead of the state of a specific operator? It would be helpful when there is some unrestored states of a previously removed operator in the savepoint. > > Best, > Paul Lam > >> 在 2019年5月30日,09:55,vino yang <[hidden email]> 写道: >> >> Hi Seth, >> >> Glad to see this FLIP, big +1 for this feature! >> >> Best, >> Vino >> >> Seth Wiesman <[hidden email]> 于2019年5月30日周四 上午7:14写道: >> >>> Hey Everyone! >>> >>> Gordon and I have been discussing adding a savepoint connector to flink >>> for reading, writing and modifying savepoints. >>> >>> This is useful for: >>> >>> Analyzing state for interesting patterns >>> Troubleshooting or auditing jobs by checking for discrepancies in state >>> Bootstrapping state for new applications >>> Modifying savepoints such as: >>> Changing max parallelism >>> Making breaking schema changes >>> Correcting invalid state >>> >>> We are looking forward to your feedback! >>> >>> This is the FLIP: >>> >>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >>> >>> Seth >>> >>> >>> > |
In reply to this post by PaulLam
Hi Seth,
Big +1 from my side. I like this idea. IMO, it’s better to chose another flip name instead of ‘connector’, which is a little confusing. > 在 2019年5月30日,上午10:37,Paul Lam <[hidden email]> 写道: > > Hi Seth, > > +1 from my side. > > I was wondering if we can add a reader method to provide a full view of the states instead of the state of a specific operator? It would be helpful when there is some unrestored states of a previously removed operator in the savepoint. > > Best, > Paul Lam > >> 在 2019年5月30日,09:55,vino yang <[hidden email]> 写道: >> >> Hi Seth, >> >> Glad to see this FLIP, big +1 for this feature! >> >> Best, >> Vino >> >> Seth Wiesman <[hidden email]> 于2019年5月30日周四 上午7:14写道: >> >>> Hey Everyone! >>> >>> Gordon and I have been discussing adding a savepoint connector to flink >>> for reading, writing and modifying savepoints. >>> >>> This is useful for: >>> >>> Analyzing state for interesting patterns >>> Troubleshooting or auditing jobs by checking for discrepancies in state >>> Bootstrapping state for new applications >>> Modifying savepoints such as: >>> Changing max parallelism >>> Making breaking schema changes >>> Correcting invalid state >>> >>> We are looking forward to your feedback! >>> >>> This is the FLIP: >>> >>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >>> >>> Seth >>> >>> >>> > |
In reply to this post by Seth Wiesman-4
Hi Seth,
Sorry for the confusion. I mean currently we need to know the operator id, state name and the state type (eg. ListState, MapState) beforehand to get the states. Is possible that we can perform a scan to get all existing operator ids or state names in the savepoint? It would be good to know what states are in the savepoint before we get to a specific state. For example, if we analyze a savepoint created weeks ago, and the corresponding job has been modified since that, say, moved from KafkaSink to KinesisSink, so we are not sure whether we have the Kafka sink states or the Kinesis sink states in the savepoint and might need to try twice to get the right one. I’m not familiar with the savepoint formats, so pardon me if it’s a dumb question. Best, Paul Lam > 在 2019年5月30日,11:09,Seth Wiesman <[hidden email]> 写道: > > Hi Paul, > > I’m not following, could you provide and example of the kind of operation your describing? > > Seth > >> On May 29, 2019, at 7:37 PM, Paul Lam <[hidden email]> wrote: >> >> Hi Seth, >> >> +1 from my side. >> >> I was wondering if we can add a reader method to provide a full view of the states instead of the state of a specific operator? It would be helpful when there is some unrestored states of a previously removed operator in the savepoint. >> >> Best, >> Paul Lam >> >>> 在 2019年5月30日,09:55,vino yang <[hidden email]> 写道: >>> >>> Hi Seth, >>> >>> Glad to see this FLIP, big +1 for this feature! >>> >>> Best, >>> Vino >>> >>> Seth Wiesman <[hidden email]> 于2019年5月30日周四 上午7:14写道: >>> >>>> Hey Everyone! >>>> >>>> Gordon and I have been discussing adding a savepoint connector to flink >>>> for reading, writing and modifying savepoints. >>>> >>>> This is useful for: >>>> >>>> Analyzing state for interesting patterns >>>> Troubleshooting or auditing jobs by checking for discrepancies in state >>>> Bootstrapping state for new applications >>>> Modifying savepoints such as: >>>> Changing max parallelism >>>> Making breaking schema changes >>>> Correcting invalid state >>>> >>>> We are looking forward to your feedback! >>>> >>>> This is the FLIP: >>>> >>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >>>> >>>> Seth >>>> >>>> >>>> >> |
The name "Savepoint Connector" might indeed be not that good, as it doesn't
point out the fact that with the current design, all kinds of snapshots (savepoint / full or incremental checkpoints) can be read. @Paul That would be a very valid requirement. Querying the list of existing operator ids should be straight forward, as that information is in the snapshot metadata file. However, querying state names / state structure / state type would currently be impossible without also reading the state itself, as that information isn't globally available and can only be known when each key group is being read. We could potentially make those information available in the snapshot metadata file, but that would require more work. I think that can be a next step once we have an initial version. Cheers, Gordon On Thu, May 30, 2019 at 1:21 PM Paul Lam <[hidden email]> wrote: > Hi Seth, > > Sorry for the confusion. I mean currently we need to know the operator id, > state name and the state type (eg. ListState, MapState) beforehand to get > the states. Is possible that we can perform a scan to get all existing > operator ids or state names in the savepoint? It would be good to know what > states are in the savepoint before we get to a specific state. > > For example, if we analyze a savepoint created weeks ago, and the > corresponding job has been modified since that, say, moved from KafkaSink > to KinesisSink, so we are not sure whether we have the Kafka sink states or > the Kinesis sink states in the savepoint and might need to try twice to get > the right one. > > I’m not familiar with the savepoint formats, so pardon me if it’s a dumb > question. > > Best, > Paul Lam > > 在 2019年5月30日,11:09,Seth Wiesman <[hidden email]> 写道: > > Hi Paul, > > I’m not following, could you provide and example of the kind of operation > your describing? > > Seth > > On May 29, 2019, at 7:37 PM, Paul Lam <[hidden email]> wrote: > > Hi Seth, > > +1 from my side. > > I was wondering if we can add a reader method to provide a full view of > the states instead of the state of a specific operator? It would be helpful > when there is some unrestored states of a previously removed operator in > the savepoint. > > Best, > Paul Lam > > 在 2019年5月30日,09:55,vino yang <[hidden email]> 写道: > > Hi Seth, > > Glad to see this FLIP, big +1 for this feature! > > Best, > Vino > > Seth Wiesman <[hidden email]> 于2019年5月30日周四 上午7:14写道: > > Hey Everyone! > > Gordon and I have been discussing adding a savepoint connector to flink > for reading, writing and modifying savepoints. > > This is useful for: > > Analyzing state for interesting patterns > Troubleshooting or auditing jobs by checking for discrepancies in state > Bootstrapping state for new applications > Modifying savepoints such as: > Changing max parallelism > Making breaking schema changes > Correcting invalid state > > We are looking forward to your feedback! > > This is the FLIP: > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector > > Seth > > > > > > |
+1 from my size.
I think it will be a good feature. Best -- Louis Email:[hidden email] > On 30 May 2019, at 15:57, Tzu-Li (Gordon) Tai <[hidden email]> wrote: > > The name "Savepoint Connector" might indeed be not that good, as it doesn't > point out the fact that with the current design, all kinds of snapshots > (savepoint / full or incremental checkpoints) can be read. > > @Paul > That would be a very valid requirement. Querying the list of existing > operator ids should be straight forward, as that information is in the > snapshot metadata file. > However, querying state names / state structure / state type would > currently be impossible without also reading the state itself, as that > information isn't globally available and can only be known when each key > group is being read. We could potentially make those information available > in the snapshot metadata file, but that would require more work. I think > that can be a next step once we have an initial version. > > Cheers, > Gordon > > On Thu, May 30, 2019 at 1:21 PM Paul Lam <[hidden email]> wrote: > >> Hi Seth, >> >> Sorry for the confusion. I mean currently we need to know the operator id, >> state name and the state type (eg. ListState, MapState) beforehand to get >> the states. Is possible that we can perform a scan to get all existing >> operator ids or state names in the savepoint? It would be good to know what >> states are in the savepoint before we get to a specific state. >> >> For example, if we analyze a savepoint created weeks ago, and the >> corresponding job has been modified since that, say, moved from KafkaSink >> to KinesisSink, so we are not sure whether we have the Kafka sink states or >> the Kinesis sink states in the savepoint and might need to try twice to get >> the right one. >> >> I’m not familiar with the savepoint formats, so pardon me if it’s a dumb >> question. >> >> Best, >> Paul Lam >> >> 在 2019年5月30日,11:09,Seth Wiesman <[hidden email]> 写道: >> >> Hi Paul, >> >> I’m not following, could you provide and example of the kind of operation >> your describing? >> >> Seth >> >> On May 29, 2019, at 7:37 PM, Paul Lam <[hidden email]> wrote: >> >> Hi Seth, >> >> +1 from my side. >> >> I was wondering if we can add a reader method to provide a full view of >> the states instead of the state of a specific operator? It would be helpful >> when there is some unrestored states of a previously removed operator in >> the savepoint. >> >> Best, >> Paul Lam >> >> 在 2019年5月30日,09:55,vino yang <[hidden email]> 写道: >> >> Hi Seth, >> >> Glad to see this FLIP, big +1 for this feature! >> >> Best, >> Vino >> >> Seth Wiesman <[hidden email]> 于2019年5月30日周四 上午7:14写道: >> >> Hey Everyone! >> >> Gordon and I have been discussing adding a savepoint connector to flink >> for reading, writing and modifying savepoints. >> >> This is useful for: >> >> Analyzing state for interesting patterns >> Troubleshooting or auditing jobs by checking for discrepancies in state >> Bootstrapping state for new applications >> Modifying savepoints such as: >> Changing max parallelism >> Making breaking schema changes >> Correcting invalid state >> >> We are looking forward to your feedback! >> >> This is the FLIP: >> >> >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >> >> Seth >> >> >> >> >> >> |
@Paul
I agree with Gordon that those are useful features. The only thing I’d like to add is that I don’t believe listing operator ids will be useful to most users, they want to see UIDs which would also require changes to the Savepoint metadata file. I think that would be a good follow up but outside the scope of an initial implementation. Seth > On May 30, 2019, at 3:05 AM, Louis <[hidden email]> wrote: > > +1 from my size. > > I think it will be a good feature. > > Best > -- > Louis > Email:[hidden email] > >> On 30 May 2019, at 15:57, Tzu-Li (Gordon) Tai <[hidden email]> wrote: >> >> The name "Savepoint Connector" might indeed be not that good, as it doesn't >> point out the fact that with the current design, all kinds of snapshots >> (savepoint / full or incremental checkpoints) can be read. >> >> @Paul >> That would be a very valid requirement. Querying the list of existing >> operator ids should be straight forward, as that information is in the >> snapshot metadata file. >> However, querying state names / state structure / state type would >> currently be impossible without also reading the state itself, as that >> information isn't globally available and can only be known when each key >> group is being read. We could potentially make those information available >> in the snapshot metadata file, but that would require more work. I think >> that can be a next step once we have an initial version. >> >> Cheers, >> Gordon >> >>> On Thu, May 30, 2019 at 1:21 PM Paul Lam <[hidden email]> wrote: >>> >>> Hi Seth, >>> >>> Sorry for the confusion. I mean currently we need to know the operator id, >>> state name and the state type (eg. ListState, MapState) beforehand to get >>> the states. Is possible that we can perform a scan to get all existing >>> operator ids or state names in the savepoint? It would be good to know what >>> states are in the savepoint before we get to a specific state. >>> >>> For example, if we analyze a savepoint created weeks ago, and the >>> corresponding job has been modified since that, say, moved from KafkaSink >>> to KinesisSink, so we are not sure whether we have the Kafka sink states or >>> the Kinesis sink states in the savepoint and might need to try twice to get >>> the right one. >>> >>> I’m not familiar with the savepoint formats, so pardon me if it’s a dumb >>> question. >>> >>> Best, >>> Paul Lam >>> >>> 在 2019年5月30日,11:09,Seth Wiesman <[hidden email]> 写道: >>> >>> Hi Paul, >>> >>> I’m not following, could you provide and example of the kind of operation >>> your describing? >>> >>> Seth >>> >>> On May 29, 2019, at 7:37 PM, Paul Lam <[hidden email]> wrote: >>> >>> Hi Seth, >>> >>> +1 from my side. >>> >>> I was wondering if we can add a reader method to provide a full view of >>> the states instead of the state of a specific operator? It would be helpful >>> when there is some unrestored states of a previously removed operator in >>> the savepoint. >>> >>> Best, >>> Paul Lam >>> >>> 在 2019年5月30日,09:55,vino yang <[hidden email]> 写道: >>> >>> Hi Seth, >>> >>> Glad to see this FLIP, big +1 for this feature! >>> >>> Best, >>> Vino >>> >>> Seth Wiesman <[hidden email]> 于2019年5月30日周四 上午7:14写道: >>> >>> Hey Everyone! >>> >>> Gordon and I have been discussing adding a savepoint connector to flink >>> for reading, writing and modifying savepoints. >>> >>> This is useful for: >>> >>> Analyzing state for interesting patterns >>> Troubleshooting or auditing jobs by checking for discrepancies in state >>> Bootstrapping state for new applications >>> Modifying savepoints such as: >>> Changing max parallelism >>> Making breaking schema changes >>> Correcting invalid state >>> >>> We are looking forward to your feedback! >>> >>> This is the FLIP: >>> >>> >>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >>> >>> Seth >>> >>> >>> >>> >>> >>> > > |
Hi,
@Gordon @Seth Thanks a lot for your inputs! In general, I agree with you. The metadata querying feature is a nice-to-have but not a must-have, and it’s reasonable to make it as a follow up since it requires some extra work. Best, Paul Lam > 在 2019年5月30日,19:22,Seth Wiesman <[hidden email]> 写道: > > @Paul > > I agree with Gordon that those are useful features. The only thing I’d like to add is that I don’t believe listing operator ids will be useful to most users, they want to see UIDs which would also require changes to the Savepoint metadata file. I think that would be a good follow up but outside the scope of an initial implementation. > > Seth > >> On May 30, 2019, at 3:05 AM, Louis <[hidden email]> wrote: >> >> +1 from my size. >> >> I think it will be a good feature. >> >> Best >> -- >> Louis >> Email:[hidden email] >> >>> On 30 May 2019, at 15:57, Tzu-Li (Gordon) Tai <[hidden email]> wrote: >>> >>> The name "Savepoint Connector" might indeed be not that good, as it doesn't >>> point out the fact that with the current design, all kinds of snapshots >>> (savepoint / full or incremental checkpoints) can be read. >>> >>> @Paul >>> That would be a very valid requirement. Querying the list of existing >>> operator ids should be straight forward, as that information is in the >>> snapshot metadata file. >>> However, querying state names / state structure / state type would >>> currently be impossible without also reading the state itself, as that >>> information isn't globally available and can only be known when each key >>> group is being read. We could potentially make those information available >>> in the snapshot metadata file, but that would require more work. I think >>> that can be a next step once we have an initial version. >>> >>> Cheers, >>> Gordon >>> >>>> On Thu, May 30, 2019 at 1:21 PM Paul Lam <[hidden email]> wrote: >>>> >>>> Hi Seth, >>>> >>>> Sorry for the confusion. I mean currently we need to know the operator id, >>>> state name and the state type (eg. ListState, MapState) beforehand to get >>>> the states. Is possible that we can perform a scan to get all existing >>>> operator ids or state names in the savepoint? It would be good to know what >>>> states are in the savepoint before we get to a specific state. >>>> >>>> For example, if we analyze a savepoint created weeks ago, and the >>>> corresponding job has been modified since that, say, moved from KafkaSink >>>> to KinesisSink, so we are not sure whether we have the Kafka sink states or >>>> the Kinesis sink states in the savepoint and might need to try twice to get >>>> the right one. >>>> >>>> I’m not familiar with the savepoint formats, so pardon me if it’s a dumb >>>> question. >>>> >>>> Best, >>>> Paul Lam >>>> >>>> 在 2019年5月30日,11:09,Seth Wiesman <[hidden email]> 写道: >>>> >>>> Hi Paul, >>>> >>>> I’m not following, could you provide and example of the kind of operation >>>> your describing? >>>> >>>> Seth >>>> >>>> On May 29, 2019, at 7:37 PM, Paul Lam <[hidden email]> wrote: >>>> >>>> Hi Seth, >>>> >>>> +1 from my side. >>>> >>>> I was wondering if we can add a reader method to provide a full view of >>>> the states instead of the state of a specific operator? It would be helpful >>>> when there is some unrestored states of a previously removed operator in >>>> the savepoint. >>>> >>>> Best, >>>> Paul Lam >>>> >>>> 在 2019年5月30日,09:55,vino yang <[hidden email]> 写道: >>>> >>>> Hi Seth, >>>> >>>> Glad to see this FLIP, big +1 for this feature! >>>> >>>> Best, >>>> Vino >>>> >>>> Seth Wiesman <[hidden email]> 于2019年5月30日周四 上午7:14写道: >>>> >>>> Hey Everyone! >>>> >>>> Gordon and I have been discussing adding a savepoint connector to flink >>>> for reading, writing and modifying savepoints. >>>> >>>> This is useful for: >>>> >>>> Analyzing state for interesting patterns >>>> Troubleshooting or auditing jobs by checking for discrepancies in state >>>> Bootstrapping state for new applications >>>> Modifying savepoints such as: >>>> Changing max parallelism >>>> Making breaking schema changes >>>> Correcting invalid state >>>> >>>> We are looking forward to your feedback! >>>> >>>> This is the FLIP: >>>> >>>> >>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >>>> >>>> Seth >>>> >>>> >>>> >>>> >>>> >>>> >> >> |
this is an awesome feature.
> The name "Savepoint Connector" might indeed be not that good, as it doesn't point out the fact that with the current design, all kinds of snapshots (savepoint / full or incremental checkpoints) can be read. @Gordon can you add the above clarification to the FLIP page? I was wondering if it supports (full or incremental) checkpoint. Here is one way how we can leverage this new feature. For monitoring lag/stuck problem, we have external service monitor committed offsets to Kafka broker. As you probably know, async commit Kafka offset is best-effort in notifyCheckpointComplete. We have run into Kafka scalability issue for parallelism jobs due to single coordinator. Alternative is to inspect the source of truth of Flink checkpoint/savepoint and extract offsets from Kafka source operator. On Thu, May 30, 2019 at 4:51 AM Paul Lam <[hidden email]> wrote: > Hi, > > @Gordon @Seth > > Thanks a lot for your inputs! In general, I agree with you. The metadata > querying feature is a nice-to-have but not a must-have, and it’s reasonable > to make it as a follow up since it requires some extra work. > > Best, > Paul Lam > > > 在 2019年5月30日,19:22,Seth Wiesman <[hidden email]> 写道: > > > > @Paul > > > > I agree with Gordon that those are useful features. The only thing I’d > like to add is that I don’t believe listing operator ids will be useful to > most users, they want to see UIDs which would also require changes to the > Savepoint metadata file. I think that would be a good follow up but outside > the scope of an initial implementation. > > > > Seth > > > >> On May 30, 2019, at 3:05 AM, Louis <[hidden email]> wrote: > >> > >> +1 from my size. > >> > >> I think it will be a good feature. > >> > >> Best > >> -- > >> Louis > >> Email:[hidden email] > >> > >>> On 30 May 2019, at 15:57, Tzu-Li (Gordon) Tai <[hidden email]> > wrote: > >>> > >>> The name "Savepoint Connector" might indeed be not that good, as it > doesn't > >>> point out the fact that with the current design, all kinds of snapshots > >>> (savepoint / full or incremental checkpoints) can be read. > >>> > >>> @Paul > >>> That would be a very valid requirement. Querying the list of existing > >>> operator ids should be straight forward, as that information is in the > >>> snapshot metadata file. > >>> However, querying state names / state structure / state type would > >>> currently be impossible without also reading the state itself, as that > >>> information isn't globally available and can only be known when each > key > >>> group is being read. We could potentially make those information > available > >>> in the snapshot metadata file, but that would require more work. I > think > >>> that can be a next step once we have an initial version. > >>> > >>> Cheers, > >>> Gordon > >>> > >>>> On Thu, May 30, 2019 at 1:21 PM Paul Lam <[hidden email]> > wrote: > >>>> > >>>> Hi Seth, > >>>> > >>>> Sorry for the confusion. I mean currently we need to know the > operator id, > >>>> state name and the state type (eg. ListState, MapState) beforehand to > get > >>>> the states. Is possible that we can perform a scan to get all existing > >>>> operator ids or state names in the savepoint? It would be good to > know what > >>>> states are in the savepoint before we get to a specific state. > >>>> > >>>> For example, if we analyze a savepoint created weeks ago, and the > >>>> corresponding job has been modified since that, say, moved from > KafkaSink > >>>> to KinesisSink, so we are not sure whether we have the Kafka sink > states or > >>>> the Kinesis sink states in the savepoint and might need to try twice > to get > >>>> the right one. > >>>> > >>>> I’m not familiar with the savepoint formats, so pardon me if it’s a > dumb > >>>> question. > >>>> > >>>> Best, > >>>> Paul Lam > >>>> > >>>> 在 2019年5月30日,11:09,Seth Wiesman <[hidden email]> 写道: > >>>> > >>>> Hi Paul, > >>>> > >>>> I’m not following, could you provide and example of the kind of > operation > >>>> your describing? > >>>> > >>>> Seth > >>>> > >>>> On May 29, 2019, at 7:37 PM, Paul Lam <[hidden email]> wrote: > >>>> > >>>> Hi Seth, > >>>> > >>>> +1 from my side. > >>>> > >>>> I was wondering if we can add a reader method to provide a full view > of > >>>> the states instead of the state of a specific operator? It would be > helpful > >>>> when there is some unrestored states of a previously removed operator > in > >>>> the savepoint. > >>>> > >>>> Best, > >>>> Paul Lam > >>>> > >>>> 在 2019年5月30日,09:55,vino yang <[hidden email]> 写道: > >>>> > >>>> Hi Seth, > >>>> > >>>> Glad to see this FLIP, big +1 for this feature! > >>>> > >>>> Best, > >>>> Vino > >>>> > >>>> Seth Wiesman <[hidden email]> 于2019年5月30日周四 上午7:14写道: > >>>> > >>>> Hey Everyone! > >>>> > >>>> Gordon and I have been discussing adding a savepoint connector to > flink > >>>> for reading, writing and modifying savepoints. > >>>> > >>>> This is useful for: > >>>> > >>>> Analyzing state for interesting patterns > >>>> Troubleshooting or auditing jobs by checking for discrepancies in > state > >>>> Bootstrapping state for new applications > >>>> Modifying savepoints such as: > >>>> Changing max parallelism > >>>> Making breaking schema changes > >>>> Correcting invalid state > >>>> > >>>> We are looking forward to your feedback! > >>>> > >>>> This is the FLIP: > >>>> > >>>> > >>>> > >>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector > >>>> > >>>> Seth > >>>> > >>>> > >>>> > >>>> > >>>> > >>>> > >> > >> > > |
Hi Seth,
big +1, happy to see this moving forward :) I have seen plenty of users, who refrained using managed state for some of their data/use cases due to the lack of something like this. I am not sure about the name "Savepoint Connector", but for a different reason. While it is technically a "connector" for the DataSet API, the feature is much more than a connector, almost like a small "Savepoint API". Best, Konstantin On Fri, May 31, 2019 at 1:45 AM Steven Wu <[hidden email]> wrote: > this is an awesome feature. > > > The name "Savepoint Connector" might indeed be not that good, as it > doesn't > point out the fact that with the current design, all kinds of snapshots > (savepoint / full or incremental checkpoints) can be read. > > @Gordon can you add the above clarification to the FLIP page? I was > wondering if it supports (full or incremental) checkpoint. > > Here is one way how we can leverage this new feature. For monitoring > lag/stuck problem, we have external service monitor committed offsets to > Kafka broker. As you probably know, async commit Kafka offset is > best-effort in notifyCheckpointComplete. We have run into Kafka > scalability issue for parallelism jobs due to single coordinator. > Alternative is to inspect the source of truth of Flink checkpoint/savepoint > and extract offsets from Kafka source operator. > > > On Thu, May 30, 2019 at 4:51 AM Paul Lam <[hidden email]> wrote: > > > Hi, > > > > @Gordon @Seth > > > > Thanks a lot for your inputs! In general, I agree with you. The metadata > > querying feature is a nice-to-have but not a must-have, and it’s > reasonable > > to make it as a follow up since it requires some extra work. > > > > Best, > > Paul Lam > > > > > 在 2019年5月30日,19:22,Seth Wiesman <[hidden email]> 写道: > > > > > > @Paul > > > > > > I agree with Gordon that those are useful features. The only thing I’d > > like to add is that I don’t believe listing operator ids will be useful > to > > most users, they want to see UIDs which would also require changes to the > > Savepoint metadata file. I think that would be a good follow up but > outside > > the scope of an initial implementation. > > > > > > Seth > > > > > >> On May 30, 2019, at 3:05 AM, Louis <[hidden email]> wrote: > > >> > > >> +1 from my size. > > >> > > >> I think it will be a good feature. > > >> > > >> Best > > >> -- > > >> Louis > > >> Email:[hidden email] > > >> > > >>> On 30 May 2019, at 15:57, Tzu-Li (Gordon) Tai <[hidden email]> > > wrote: > > >>> > > >>> The name "Savepoint Connector" might indeed be not that good, as it > > doesn't > > >>> point out the fact that with the current design, all kinds of > snapshots > > >>> (savepoint / full or incremental checkpoints) can be read. > > >>> > > >>> @Paul > > >>> That would be a very valid requirement. Querying the list of existing > > >>> operator ids should be straight forward, as that information is in > the > > >>> snapshot metadata file. > > >>> However, querying state names / state structure / state type would > > >>> currently be impossible without also reading the state itself, as > that > > >>> information isn't globally available and can only be known when each > > key > > >>> group is being read. We could potentially make those information > > available > > >>> in the snapshot metadata file, but that would require more work. I > > think > > >>> that can be a next step once we have an initial version. > > >>> > > >>> Cheers, > > >>> Gordon > > >>> > > >>>> On Thu, May 30, 2019 at 1:21 PM Paul Lam <[hidden email]> > > wrote: > > >>>> > > >>>> Hi Seth, > > >>>> > > >>>> Sorry for the confusion. I mean currently we need to know the > > operator id, > > >>>> state name and the state type (eg. ListState, MapState) beforehand > to > > get > > >>>> the states. Is possible that we can perform a scan to get all > existing > > >>>> operator ids or state names in the savepoint? It would be good to > > know what > > >>>> states are in the savepoint before we get to a specific state. > > >>>> > > >>>> For example, if we analyze a savepoint created weeks ago, and the > > >>>> corresponding job has been modified since that, say, moved from > > KafkaSink > > >>>> to KinesisSink, so we are not sure whether we have the Kafka sink > > states or > > >>>> the Kinesis sink states in the savepoint and might need to try twice > > to get > > >>>> the right one. > > >>>> > > >>>> I’m not familiar with the savepoint formats, so pardon me if it’s a > > dumb > > >>>> question. > > >>>> > > >>>> Best, > > >>>> Paul Lam > > >>>> > > >>>> 在 2019年5月30日,11:09,Seth Wiesman <[hidden email]> 写道: > > >>>> > > >>>> Hi Paul, > > >>>> > > >>>> I’m not following, could you provide and example of the kind of > > operation > > >>>> your describing? > > >>>> > > >>>> Seth > > >>>> > > >>>> On May 29, 2019, at 7:37 PM, Paul Lam <[hidden email]> > wrote: > > >>>> > > >>>> Hi Seth, > > >>>> > > >>>> +1 from my side. > > >>>> > > >>>> I was wondering if we can add a reader method to provide a full view > > of > > >>>> the states instead of the state of a specific operator? It would be > > helpful > > >>>> when there is some unrestored states of a previously removed > operator > > in > > >>>> the savepoint. > > >>>> > > >>>> Best, > > >>>> Paul Lam > > >>>> > > >>>> 在 2019年5月30日,09:55,vino yang <[hidden email]> 写道: > > >>>> > > >>>> Hi Seth, > > >>>> > > >>>> Glad to see this FLIP, big +1 for this feature! > > >>>> > > >>>> Best, > > >>>> Vino > > >>>> > > >>>> Seth Wiesman <[hidden email]> 于2019年5月30日周四 上午7:14写道: > > >>>> > > >>>> Hey Everyone! > > >>>> > > >>>> Gordon and I have been discussing adding a savepoint connector to > > flink > > >>>> for reading, writing and modifying savepoints. > > >>>> > > >>>> This is useful for: > > >>>> > > >>>> Analyzing state for interesting patterns > > >>>> Troubleshooting or auditing jobs by checking for discrepancies in > > state > > >>>> Bootstrapping state for new applications > > >>>> Modifying savepoints such as: > > >>>> Changing max parallelism > > >>>> Making breaking schema changes > > >>>> Correcting invalid state > > >>>> > > >>>> We are looking forward to your feedback! > > >>>> > > >>>> This is the FLIP: > > >>>> > > >>>> > > >>>> > > >>>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector > > >>>> > > >>>> Seth > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >>>> > > >> > > >> > > > > > -- Konstantin Knauf | Solutions Architect +49 160 91394525 Planned Absences: 20. - 21.06.2019 <https://www.ververica.com/> Follow us @VervericaData -- Join Flink Forward <https://flink-forward.org/> - The Apache Flink Conference Stream Processing | Event Driven | Real Time -- Data Artisans GmbH | Invalidenstrasse 115, 10115 Berlin, Germany -- Data Artisans GmbH Registered at Amtsgericht Charlottenburg: HRB 158244 B Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen |
In reply to this post by Seth Wiesman-4
Hi,
this is awesome, and really useful feature. If I might ask for one thing to consider - would it be possible to make the Savepoint manipulation API (at least writing the Savepoint) less dependent on other parts of Flink internals (e.g. |KeyedStateBootstrapFunction|) and provide something more general (e.g. some generic Writer)? Why I'm asking for that - I can totally imagine situation, where users might want to create bootstrapped state by some other runner (e.g. Apache Spark), and then run Apache Flink after the state has been created. This makes even more sense in context of Apache Beam, which provides all the necessary work to make this happen. The question is - would it be possible to design this feature so that writing the savepoint from different runner would be possible? Cheers, Jan On 5/30/19 1:14 AM, Seth Wiesman wrote: > Hey Everyone! > > Gordon and I have been discussing adding a savepoint connector to flink for reading, writing and modifying savepoints. > > This is useful for: > > Analyzing state for interesting patterns > Troubleshooting or auditing jobs by checking for discrepancies in state > Bootstrapping state for new applications > Modifying savepoints such as: > Changing max parallelism > Making breaking schema changes > Correcting invalid state > > We are looking forward to your feedback! > > This is the FLIP: > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector > > Seth > > > |
@Konstantin agreed, that was a large impotence for this feature. Also I am happy to change the name to something that better describes the feature set.
@Lan Savepoints depend heavily on a number of flink internal components that may change between versions: state backends internals, type serializers, the specific hash function used to turn a UID into an OperatorID, etc. I consider it a feature of this proposal that the library depends on those internal components instead of reverse engineering the binary format. This way as those internals change, or new state features are added (think the recent addition of TTL) we will get support automatically. I do not believe anything else is maintainable. Seth > On May 31, 2019, at 5:56 AM, Jan Lukavský <[hidden email]> wrote: > > Hi, > > this is awesome, and really useful feature. If I might ask for one thing to consider - would it be possible to make the Savepoint manipulation API (at least writing the Savepoint) less dependent on other parts of Flink internals (e.g. |KeyedStateBootstrapFunction|) and provide something more general (e.g. some generic Writer)? Why I'm asking for that - I can totally imagine situation, where users might want to create bootstrapped state by some other runner (e.g. Apache Spark), and then run Apache Flink after the state has been created. This makes even more sense in context of Apache Beam, which provides all the necessary work to make this happen. The question is - would it be possible to design this feature so that writing the savepoint from different runner would be possible? > > Cheers, > > Jan > >> On 5/30/19 1:14 AM, Seth Wiesman wrote: >> Hey Everyone! >> >> Gordon and I have been discussing adding a savepoint connector to flink for reading, writing and modifying savepoints. >> >> This is useful for: >> >> Analyzing state for interesting patterns >> Troubleshooting or auditing jobs by checking for discrepancies in state >> Bootstrapping state for new applications >> Modifying savepoints such as: >> Changing max parallelism >> Making breaking schema changes >> Correcting invalid state >> >> We are looking forward to your feedback! >> >> This is the FLIP: >> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >> >> Seth >> >> >> |
Hi Seth,
that sounds reasonable. What I was asking for was not to reverse engineer binary format, but to make the savepoint write API a little more reusable, so that it could be wrapped into some other technology. I don't know the details enough to propose a solution, but it seems to me, that it could be possible to use something like Writer instead of Transform. Or maybe the Transform can use the Writer internally, the goal is just to enable to create the savepoint from "'outside" of Flink (with some library, of course). Jan On 5/31/19 1:17 PM, Seth Wiesman wrote: > @Konstantin agreed, that was a large impotence for this feature. Also I am happy to change the name to something that better describes the feature set. > > @Lan > > Savepoints depend heavily on a number of flink internal components that may change between versions: state backends internals, type serializers, the specific hash function used to turn a UID into an OperatorID, etc. I consider it a feature of this proposal that the library depends on those internal components instead of reverse engineering the binary format. This way as those internals change, or new state features are added (think the recent addition of TTL) we will get support automatically. I do not believe anything else is maintainable. > > Seth > >> On May 31, 2019, at 5:56 AM, Jan Lukavský <[hidden email]> wrote: >> >> Hi, >> >> this is awesome, and really useful feature. If I might ask for one thing to consider - would it be possible to make the Savepoint manipulation API (at least writing the Savepoint) less dependent on other parts of Flink internals (e.g. |KeyedStateBootstrapFunction|) and provide something more general (e.g. some generic Writer)? Why I'm asking for that - I can totally imagine situation, where users might want to create bootstrapped state by some other runner (e.g. Apache Spark), and then run Apache Flink after the state has been created. This makes even more sense in context of Apache Beam, which provides all the necessary work to make this happen. The question is - would it be possible to design this feature so that writing the savepoint from different runner would be possible? >> >> Cheers, >> >> Jan >> >>> On 5/30/19 1:14 AM, Seth Wiesman wrote: >>> Hey Everyone! >>> >>> Gordon and I have been discussing adding a savepoint connector to flink for reading, writing and modifying savepoints. >>> >>> This is useful for: >>> >>> Analyzing state for interesting patterns >>> Troubleshooting or auditing jobs by checking for discrepancies in state >>> Bootstrapping state for new applications >>> Modifying savepoints such as: >>> Changing max parallelism >>> Making breaking schema changes >>> Correcting invalid state >>> >>> We are looking forward to your feedback! >>> >>> This is the FLIP: >>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >>> >>> Seth >>> >>> >>> |
@Jan Gotcha,
So in reusing components it explicitly is not a writer. This is not a savepoint output format in the way we have a parquet output format. The reason for the Transform api is to hide the underlying details, it does not simply append a output writer to the end of a dataset. This gets into the implementation details but at a high level, the dataset is: 1) partitioned using key groups 2) data is run through a standard stream operator that takes a snapshot of its state after processing all records and outputs metadata handles for each subtask 3) those metadata handles are aggregated down to a single savepoint handle 4) that handle is written out as a final metadata file What’s important here is that the api actually depends on the data flow collection and state is written out as a side effect of taking a savepoint. The FLIP describes a lose coupling to the dataset api for eventual migration to BoundedStream, that is true. However, the api does require knowing what concrete data flow is being used to perform these re-partitionings and post aggregations. I’m linking to my prototype implementation, particularly what actually happens when you call write and run the transformations. Let me know if that helps clarify. Seth https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63 > On May 31, 2019, at 7:46 AM, Jan Lukavský <[hidden email]> wrote: > > Hi Seth, > > that sounds reasonable. What I was asking for was not to reverse engineer binary format, but to make the savepoint write API a little more reusable, so that it could be wrapped into some other technology. I don't know the details enough to propose a solution, but it seems to me, that it could be possible to use something like Writer instead of Transform. Or maybe the Transform can use the Writer internally, the goal is just to enable to create the savepoint from "'outside" of Flink (with some library, of course). > > Jan > >> On 5/31/19 1:17 PM, Seth Wiesman wrote: >> @Konstantin agreed, that was a large impotence for this feature. Also I am happy to change the name to something that better describes the feature set. >> >> @Lan >> >> Savepoints depend heavily on a number of flink internal components that may change between versions: state backends internals, type serializers, the specific hash function used to turn a UID into an OperatorID, etc. I consider it a feature of this proposal that the library depends on those internal components instead of reverse engineering the binary format. This way as those internals change, or new state features are added (think the recent addition of TTL) we will get support automatically. I do not believe anything else is maintainable. >> >> Seth >> >>> On May 31, 2019, at 5:56 AM, Jan Lukavský <[hidden email]> wrote: >>> >>> Hi, >>> >>> this is awesome, and really useful feature. If I might ask for one thing to consider - would it be possible to make the Savepoint manipulation API (at least writing the Savepoint) less dependent on other parts of Flink internals (e.g. |KeyedStateBootstrapFunction|) and provide something more general (e.g. some generic Writer)? Why I'm asking for that - I can totally imagine situation, where users might want to create bootstrapped state by some other runner (e.g. Apache Spark), and then run Apache Flink after the state has been created. This makes even more sense in context of Apache Beam, which provides all the necessary work to make this happen. The question is - would it be possible to design this feature so that writing the savepoint from different runner would be possible? >>> >>> Cheers, >>> >>> Jan >>> >>>> On 5/30/19 1:14 AM, Seth Wiesman wrote: >>>> Hey Everyone! >>>> >>>> Gordon and I have been discussing adding a savepoint connector to flink for reading, writing and modifying savepoints. >>>> >>>> This is useful for: >>>> >>>> Analyzing state for interesting patterns >>>> Troubleshooting or auditing jobs by checking for discrepancies in state >>>> Bootstrapping state for new applications >>>> Modifying savepoints such as: >>>> Changing max parallelism >>>> Making breaking schema changes >>>> Correcting invalid state >>>> >>>> We are looking forward to your feedback! >>>> >>>> This is the FLIP: >>>> >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >>>> >>>> Seth >>>> >>>> >>>> |
Hi Seth,
yes, that helped! :-) What I was looking for is essentially `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It would be great if this could be written in a way, that would enable reusing it in different engine (as I mentioned - Apache Spark). There seem to be some issues though. It uses interface Savepoint, which uses several other objects and interfaces from Flink's runtime. Maybe some convenience API might help - Apache Beam, handles operator naming, so that definitely should be transitionable between systems, but I'm not sure, how to construct OperatorID from this name. Would you think, that it is possible to come up with something that could be used in this portable way? I understand, there are some more conditions, that need to be satisfied (grouping, aggregating, ...), which would of course have to be handled by the target system. But Apache Beam can help leverage that. My idea would be, that there can be runner specified PTransform, that takes PCollection of some tuples of `(operator name, key, state name, value1), (operator name, key, state name, value2)`, and Runner's responsibility would be to group/aggregate this so that it can be written by runner's provided writer (output format). All of this would need a lot more design, these are just ideas of "what could be possible", I was just wondering if this FLIP can make some first steps towards this. Many thanks for comments, Jan On 5/31/19 8:12 PM, Seth Wiesman wrote: > @Jan Gotcha, > > So in reusing components it explicitly is not a writer. This is not a savepoint output format in the way we have a parquet output format. The reason for the Transform api is to hide the underlying details, it does not simply append a output writer to the end of a dataset. This gets into the implementation details but at a high level, the dataset is: > > 1) partitioned using key groups > 2) data is run through a standard stream operator that takes a snapshot of its state after processing all records and outputs metadata handles for each subtask > 3) those metadata handles are aggregated down to a single savepoint handle > 4) that handle is written out as a final metadata file > > What’s important here is that the api actually depends on the data flow collection and state is written out as a side effect of taking a savepoint. The FLIP describes a lose coupling to the dataset api for eventual migration to BoundedStream, that is true. However, the api does require knowing what concrete data flow is being used to perform these re-partitionings and post aggregations. > > I’m linking to my prototype implementation, particularly what actually happens when you call write and run the transformations. Let me know if that helps clarify. > > Seth > > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63 > > > >> On May 31, 2019, at 7:46 AM, Jan Lukavský <[hidden email]> wrote: >> >> Hi Seth, >> >> that sounds reasonable. What I was asking for was not to reverse engineer binary format, but to make the savepoint write API a little more reusable, so that it could be wrapped into some other technology. I don't know the details enough to propose a solution, but it seems to me, that it could be possible to use something like Writer instead of Transform. Or maybe the Transform can use the Writer internally, the goal is just to enable to create the savepoint from "'outside" of Flink (with some library, of course). >> >> Jan >> >>> On 5/31/19 1:17 PM, Seth Wiesman wrote: >>> @Konstantin agreed, that was a large impotence for this feature. Also I am happy to change the name to something that better describes the feature set. >>> >>> @Lan >>> >>> Savepoints depend heavily on a number of flink internal components that may change between versions: state backends internals, type serializers, the specific hash function used to turn a UID into an OperatorID, etc. I consider it a feature of this proposal that the library depends on those internal components instead of reverse engineering the binary format. This way as those internals change, or new state features are added (think the recent addition of TTL) we will get support automatically. I do not believe anything else is maintainable. >>> >>> Seth >>> >>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <[hidden email]> wrote: >>>> >>>> Hi, >>>> >>>> this is awesome, and really useful feature. If I might ask for one thing to consider - would it be possible to make the Savepoint manipulation API (at least writing the Savepoint) less dependent on other parts of Flink internals (e.g. |KeyedStateBootstrapFunction|) and provide something more general (e.g. some generic Writer)? Why I'm asking for that - I can totally imagine situation, where users might want to create bootstrapped state by some other runner (e.g. Apache Spark), and then run Apache Flink after the state has been created. This makes even more sense in context of Apache Beam, which provides all the necessary work to make this happen. The question is - would it be possible to design this feature so that writing the savepoint from different runner would be possible? >>>> >>>> Cheers, >>>> >>>> Jan >>>> >>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote: >>>>> Hey Everyone! >>>>> >>>>> Gordon and I have been discussing adding a savepoint connector to flink for reading, writing and modifying savepoints. >>>>> >>>>> This is useful for: >>>>> >>>>> Analyzing state for interesting patterns >>>>> Troubleshooting or auditing jobs by checking for discrepancies in state >>>>> Bootstrapping state for new applications >>>>> Modifying savepoints such as: >>>>> Changing max parallelism >>>>> Making breaking schema changes >>>>> Correcting invalid state >>>>> >>>>> We are looking forward to your feedback! >>>>> >>>>> This is the FLIP: >>>>> >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >>>>> >>>>> Seth >>>>> >>>>> >>>>> |
The SavepointOutputFormat only writes out the savepoint metadata file and should be mostly ignored.
The actual state is written out by stream operators and tied into the flink runtime[1, 2, 3]. This is the most important part and the piece that I don’t think can be reasonably extracted. Seth [1] https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84 [2] https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java [3] https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java > On May 31, 2019, at 3:08 PM, Jan Lukavský <[hidden email]> wrote: > > Hi Seth, > > yes, that helped! :-) > > What I was looking for is essentially `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It would be great if this could be written in a way, that would enable reusing it in different engine (as I mentioned - Apache Spark). There seem to be some issues though. It uses interface Savepoint, which uses several other objects and interfaces from Flink's runtime. Maybe some convenience API might help - Apache Beam, handles operator naming, so that definitely should be transitionable between systems, but I'm not sure, how to construct OperatorID from this name. Would you think, that it is possible to come up with something that could be used in this portable way? > > I understand, there are some more conditions, that need to be satisfied (grouping, aggregating, ...), which would of course have to be handled by the target system. But Apache Beam can help leverage that. My idea would be, that there can be runner specified PTransform, that takes PCollection of some tuples of `(operator name, key, state name, value1), (operator name, key, state name, value2)`, and Runner's responsibility would be to group/aggregate this so that it can be written by runner's provided writer (output format). > > All of this would need a lot more design, these are just ideas of "what could be possible", I was just wondering if this FLIP can make some first steps towards this. > > Many thanks for comments, > > Jan > >> On 5/31/19 8:12 PM, Seth Wiesman wrote: >> @Jan Gotcha, >> >> So in reusing components it explicitly is not a writer. This is not a savepoint output format in the way we have a parquet output format. The reason for the Transform api is to hide the underlying details, it does not simply append a output writer to the end of a dataset. This gets into the implementation details but at a high level, the dataset is: >> >> 1) partitioned using key groups >> 2) data is run through a standard stream operator that takes a snapshot of its state after processing all records and outputs metadata handles for each subtask >> 3) those metadata handles are aggregated down to a single savepoint handle >> 4) that handle is written out as a final metadata file >> >> What’s important here is that the api actually depends on the data flow collection and state is written out as a side effect of taking a savepoint. The FLIP describes a lose coupling to the dataset api for eventual migration to BoundedStream, that is true. However, the api does require knowing what concrete data flow is being used to perform these re-partitionings and post aggregations. >> >> I’m linking to my prototype implementation, particularly what actually happens when you call write and run the transformations. Let me know if that helps clarify. >> >> Seth >> >> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63 >> >> >> >>> On May 31, 2019, at 7:46 AM, Jan Lukavský <[hidden email]> wrote: >>> >>> Hi Seth, >>> >>> that sounds reasonable. What I was asking for was not to reverse engineer binary format, but to make the savepoint write API a little more reusable, so that it could be wrapped into some other technology. I don't know the details enough to propose a solution, but it seems to me, that it could be possible to use something like Writer instead of Transform. Or maybe the Transform can use the Writer internally, the goal is just to enable to create the savepoint from "'outside" of Flink (with some library, of course). >>> >>> Jan >>> >>>> On 5/31/19 1:17 PM, Seth Wiesman wrote: >>>> @Konstantin agreed, that was a large impotence for this feature. Also I am happy to change the name to something that better describes the feature set. >>>> >>>> @Lan >>>> >>>> Savepoints depend heavily on a number of flink internal components that may change between versions: state backends internals, type serializers, the specific hash function used to turn a UID into an OperatorID, etc. I consider it a feature of this proposal that the library depends on those internal components instead of reverse engineering the binary format. This way as those internals change, or new state features are added (think the recent addition of TTL) we will get support automatically. I do not believe anything else is maintainable. >>>> >>>> Seth >>>> >>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <[hidden email]> wrote: >>>>> >>>>> Hi, >>>>> >>>>> this is awesome, and really useful feature. If I might ask for one thing to consider - would it be possible to make the Savepoint manipulation API (at least writing the Savepoint) less dependent on other parts of Flink internals (e.g. |KeyedStateBootstrapFunction|) and provide something more general (e.g. some generic Writer)? Why I'm asking for that - I can totally imagine situation, where users might want to create bootstrapped state by some other runner (e.g. Apache Spark), and then run Apache Flink after the state has been created. This makes even more sense in context of Apache Beam, which provides all the necessary work to make this happen. The question is - would it be possible to design this feature so that writing the savepoint from different runner would be possible? >>>>> >>>>> Cheers, >>>>> >>>>> Jan >>>>> >>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote: >>>>>> Hey Everyone! >>>>>> >>>>>> Gordon and I have been discussing adding a savepoint connector to flink for reading, writing and modifying savepoints. >>>>>> >>>>>> This is useful for: >>>>>> >>>>>> Analyzing state for interesting patterns >>>>>> Troubleshooting or auditing jobs by checking for discrepancies in state >>>>>> Bootstrapping state for new applications >>>>>> Modifying savepoints such as: >>>>>> Changing max parallelism >>>>>> Making breaking schema changes >>>>>> Correcting invalid state >>>>>> >>>>>> We are looking forward to your feedback! >>>>>> >>>>>> This is the FLIP: >>>>>> >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >>>>>> >>>>>> Seth >>>>>> >>>>>> >>>>>> |
It seems like a recurring piece of feedback was a different name. I’d like to propose moving the functionality to the libraries module and naming this the State Processing API.
Seth > On May 31, 2019, at 3:47 PM, Seth Wiesman <[hidden email]> wrote: > > The SavepointOutputFormat only writes out the savepoint metadata file and should be mostly ignored. > > The actual state is written out by stream operators and tied into the flink runtime[1, 2, 3]. > > This is the most important part and the piece that I don’t think can be reasonably extracted. > > Seth > > [1] https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84 > > [2] https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java > > [3] https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java > >> On May 31, 2019, at 3:08 PM, Jan Lukavský <[hidden email]> wrote: >> >> Hi Seth, >> >> yes, that helped! :-) >> >> What I was looking for is essentially `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It would be great if this could be written in a way, that would enable reusing it in different engine (as I mentioned - Apache Spark). There seem to be some issues though. It uses interface Savepoint, which uses several other objects and interfaces from Flink's runtime. Maybe some convenience API might help - Apache Beam, handles operator naming, so that definitely should be transitionable between systems, but I'm not sure, how to construct OperatorID from this name. Would you think, that it is possible to come up with something that could be used in this portable way? >> >> I understand, there are some more conditions, that need to be satisfied (grouping, aggregating, ...), which would of course have to be handled by the target system. But Apache Beam can help leverage that. My idea would be, that there can be runner specified PTransform, that takes PCollection of some tuples of `(operator name, key, state name, value1), (operator name, key, state name, value2)`, and Runner's responsibility would be to group/aggregate this so that it can be written by runner's provided writer (output format). >> >> All of this would need a lot more design, these are just ideas of "what could be possible", I was just wondering if this FLIP can make some first steps towards this. >> >> Many thanks for comments, >> >> Jan >> >>> On 5/31/19 8:12 PM, Seth Wiesman wrote: >>> @Jan Gotcha, >>> >>> So in reusing components it explicitly is not a writer. This is not a savepoint output format in the way we have a parquet output format. The reason for the Transform api is to hide the underlying details, it does not simply append a output writer to the end of a dataset. This gets into the implementation details but at a high level, the dataset is: >>> >>> 1) partitioned using key groups >>> 2) data is run through a standard stream operator that takes a snapshot of its state after processing all records and outputs metadata handles for each subtask >>> 3) those metadata handles are aggregated down to a single savepoint handle >>> 4) that handle is written out as a final metadata file >>> >>> What’s important here is that the api actually depends on the data flow collection and state is written out as a side effect of taking a savepoint. The FLIP describes a lose coupling to the dataset api for eventual migration to BoundedStream, that is true. However, the api does require knowing what concrete data flow is being used to perform these re-partitionings and post aggregations. >>> >>> I’m linking to my prototype implementation, particularly what actually happens when you call write and run the transformations. Let me know if that helps clarify. >>> >>> Seth >>> >>> https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63 >>> >>> >>> >>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <[hidden email]> wrote: >>>> >>>> Hi Seth, >>>> >>>> that sounds reasonable. What I was asking for was not to reverse engineer binary format, but to make the savepoint write API a little more reusable, so that it could be wrapped into some other technology. I don't know the details enough to propose a solution, but it seems to me, that it could be possible to use something like Writer instead of Transform. Or maybe the Transform can use the Writer internally, the goal is just to enable to create the savepoint from "'outside" of Flink (with some library, of course). >>>> >>>> Jan >>>> >>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote: >>>>> @Konstantin agreed, that was a large impotence for this feature. Also I am happy to change the name to something that better describes the feature set. >>>>> >>>>> @Lan >>>>> >>>>> Savepoints depend heavily on a number of flink internal components that may change between versions: state backends internals, type serializers, the specific hash function used to turn a UID into an OperatorID, etc. I consider it a feature of this proposal that the library depends on those internal components instead of reverse engineering the binary format. This way as those internals change, or new state features are added (think the recent addition of TTL) we will get support automatically. I do not believe anything else is maintainable. >>>>> >>>>> Seth >>>>> >>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <[hidden email]> wrote: >>>>>> >>>>>> Hi, >>>>>> >>>>>> this is awesome, and really useful feature. If I might ask for one thing to consider - would it be possible to make the Savepoint manipulation API (at least writing the Savepoint) less dependent on other parts of Flink internals (e.g. |KeyedStateBootstrapFunction|) and provide something more general (e.g. some generic Writer)? Why I'm asking for that - I can totally imagine situation, where users might want to create bootstrapped state by some other runner (e.g. Apache Spark), and then run Apache Flink after the state has been created. This makes even more sense in context of Apache Beam, which provides all the necessary work to make this happen. The question is - would it be possible to design this feature so that writing the savepoint from different runner would be possible? >>>>>> >>>>>> Cheers, >>>>>> >>>>>> Jan >>>>>> >>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote: >>>>>>> Hey Everyone! >>>>>>> >>>>>>> Gordon and I have been discussing adding a savepoint connector to flink for reading, writing and modifying savepoints. >>>>>>> >>>>>>> This is useful for: >>>>>>> >>>>>>> Analyzing state for interesting patterns >>>>>>> Troubleshooting or auditing jobs by checking for discrepancies in state >>>>>>> Bootstrapping state for new applications >>>>>>> Modifying savepoints such as: >>>>>>> Changing max parallelism >>>>>>> Making breaking schema changes >>>>>>> Correcting invalid state >>>>>>> >>>>>>> We are looking forward to your feedback! >>>>>>> >>>>>>> This is the FLIP: >>>>>>> >>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector >>>>>>> >>>>>>> Seth >>>>>>> >>>>>>> >>>>>>> |
+1 to renaming it as State Processing API and adding it under the
flink-libraries module. I also think we can start with the development of the feature. From the feedback so far, it seems like we're in a good spot to add in at least the initial version of this API, hopefully making it ready for 1.9.0. Cheers, Gordon On Tue, Jun 4, 2019 at 7:14 PM Seth Wiesman <[hidden email]> wrote: > It seems like a recurring piece of feedback was a different name. I’d like > to propose moving the functionality to the libraries module and naming this > the State Processing API. > > Seth > > > On May 31, 2019, at 3:47 PM, Seth Wiesman <[hidden email]> wrote: > > > > The SavepointOutputFormat only writes out the savepoint metadata file > and should be mostly ignored. > > > > The actual state is written out by stream operators and tied into the > flink runtime[1, 2, 3]. > > > > This is the most important part and the piece that I don’t think can be > reasonably extracted. > > > > Seth > > > > [1] > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/operators/KeyedStateBootstrapOperator.java#L84 > > > > [2] > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/SnapshotUtils.java > > > > [3] > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/output/BoundedOneInputStreamTaskRunner.java > > > >> On May 31, 2019, at 3:08 PM, Jan Lukavský <[hidden email]> wrote: > >> > >> Hi Seth, > >> > >> yes, that helped! :-) > >> > >> What I was looking for is essentially > `org.apache.flink.connectors.savepoint.output.SavepointOutputFormat`. It > would be great if this could be written in a way, that would enable reusing > it in different engine (as I mentioned - Apache Spark). There seem to be > some issues though. It uses interface Savepoint, which uses several other > objects and interfaces from Flink's runtime. Maybe some convenience API > might help - Apache Beam, handles operator naming, so that definitely > should be transitionable between systems, but I'm not sure, how to > construct OperatorID from this name. Would you think, that it is possible > to come up with something that could be used in this portable way? > >> > >> I understand, there are some more conditions, that need to be satisfied > (grouping, aggregating, ...), which would of course have to be handled by > the target system. But Apache Beam can help leverage that. My idea would > be, that there can be runner specified PTransform, that takes PCollection > of some tuples of `(operator name, key, state name, value1), (operator > name, key, state name, value2)`, and Runner's responsibility would be to > group/aggregate this so that it can be written by runner's provided writer > (output format). > >> > >> All of this would need a lot more design, these are just ideas of "what > could be possible", I was just wondering if this FLIP can make some first > steps towards this. > >> > >> Many thanks for comments, > >> > >> Jan > >> > >>> On 5/31/19 8:12 PM, Seth Wiesman wrote: > >>> @Jan Gotcha, > >>> > >>> So in reusing components it explicitly is not a writer. This is not a > savepoint output format in the way we have a parquet output format. The > reason for the Transform api is to hide the underlying details, it does not > simply append a output writer to the end of a dataset. This gets into the > implementation details but at a high level, the dataset is: > >>> > >>> 1) partitioned using key groups > >>> 2) data is run through a standard stream operator that takes a > snapshot of its state after processing all records and outputs metadata > handles for each subtask > >>> 3) those metadata handles are aggregated down to a single savepoint > handle > >>> 4) that handle is written out as a final metadata file > >>> > >>> What’s important here is that the api actually depends on the data > flow collection and state is written out as a side effect of taking a > savepoint. The FLIP describes a lose coupling to the dataset api for > eventual migration to BoundedStream, that is true. However, the api does > require knowing what concrete data flow is being used to perform these > re-partitionings and post aggregations. > >>> > >>> I’m linking to my prototype implementation, particularly what actually > happens when you call write and run the transformations. Let me know if > that helps clarify. > >>> > >>> Seth > >>> > >>> > https://github.com/sjwiesman/flink/blob/savepoint-connector/flink-connectors/flink-connector-savepoint/src/main/java/org/apache/flink/connectors/savepoint/api/WritableSavepoint.java#L63 > >>> > >>> > >>> > >>>> On May 31, 2019, at 7:46 AM, Jan Lukavský <[hidden email]> wrote: > >>>> > >>>> Hi Seth, > >>>> > >>>> that sounds reasonable. What I was asking for was not to reverse > engineer binary format, but to make the savepoint write API a little more > reusable, so that it could be wrapped into some other technology. I don't > know the details enough to propose a solution, but it seems to me, that it > could be possible to use something like Writer instead of Transform. Or > maybe the Transform can use the Writer internally, the goal is just to > enable to create the savepoint from "'outside" of Flink (with some library, > of course). > >>>> > >>>> Jan > >>>> > >>>>> On 5/31/19 1:17 PM, Seth Wiesman wrote: > >>>>> @Konstantin agreed, that was a large impotence for this feature. > Also I am happy to change the name to something that better describes the > feature set. > >>>>> > >>>>> @Lan > >>>>> > >>>>> Savepoints depend heavily on a number of flink internal components > that may change between versions: state backends internals, type > serializers, the specific hash function used to turn a UID into an > OperatorID, etc. I consider it a feature of this proposal that the library > depends on those internal components instead of reverse engineering the > binary format. This way as those internals change, or new state features > are added (think the recent addition of TTL) we will get support > automatically. I do not believe anything else is maintainable. > >>>>> > >>>>> Seth > >>>>> > >>>>>> On May 31, 2019, at 5:56 AM, Jan Lukavský <[hidden email]> wrote: > >>>>>> > >>>>>> Hi, > >>>>>> > >>>>>> this is awesome, and really useful feature. If I might ask for one > thing to consider - would it be possible to make the Savepoint manipulation > API (at least writing the Savepoint) less dependent on other parts of Flink > internals (e.g. |KeyedStateBootstrapFunction|) and provide something more > general (e.g. some generic Writer)? Why I'm asking for that - I can totally > imagine situation, where users might want to create bootstrapped state by > some other runner (e.g. Apache Spark), and then run Apache Flink after the > state has been created. This makes even more sense in context of Apache > Beam, which provides all the necessary work to make this happen. The > question is - would it be possible to design this feature so that writing > the savepoint from different runner would be possible? > >>>>>> > >>>>>> Cheers, > >>>>>> > >>>>>> Jan > >>>>>> > >>>>>>> On 5/30/19 1:14 AM, Seth Wiesman wrote: > >>>>>>> Hey Everyone! > >>>>>>> > >>>>>>> Gordon and I have been discussing adding a savepoint connector to > flink for reading, writing and modifying savepoints. > >>>>>>> > >>>>>>> This is useful for: > >>>>>>> > >>>>>>> Analyzing state for interesting patterns > >>>>>>> Troubleshooting or auditing jobs by checking for discrepancies > in state > >>>>>>> Bootstrapping state for new applications > >>>>>>> Modifying savepoints such as: > >>>>>>> Changing max parallelism > >>>>>>> Making breaking schema changes > >>>>>>> Correcting invalid state > >>>>>>> > >>>>>>> We are looking forward to your feedback! > >>>>>>> > >>>>>>> This is the FLIP: > >>>>>>> > >>>>>>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-43%3A+Savepoint+Connector > >>>>>>> > >>>>>>> Seth > >>>>>>> > >>>>>>> > >>>>>>> > |
Free forum by Nabble | Edit this page |