[DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

classic Classic list List threaded Threaded
12 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

JingsongLee-2
Hi devs:

I try to implement streaming file sink for table[1] like StreamingFileSink.
If the underlying is a HiveFormat, or a format that updates visibility
 through a metaStore, I have to update the metaStore in the
 notifyCheckpointComplete, but this operation occurs on the task side,
which will lead to distributed access to the metaStore, which will
 lead to bottleneck.

So I'm curious if we can support notifyOnMaster for
notifyCheckpointComplete like FinalizeOnMaster.

What do you think?

[1] https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing

Best,
Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

shimin yang
Hi Jingsong,

Big fan of this idea. We faced the same problem and resolved by adding a
distributed lock. It would be nice to have this feature in JobMaster, which
can replace the lock.

Best,
Shimin

JingsongLee <[hidden email]> 于2019年9月6日周五 下午12:20写道:

> Hi devs:
>
> I try to implement streaming file sink for table[1] like StreamingFileSink.
> If the underlying is a HiveFormat, or a format that updates visibility
>  through a metaStore, I have to update the metaStore in the
>  notifyCheckpointComplete, but this operation occurs on the task side,
> which will lead to distributed access to the metaStore, which will
>  lead to bottleneck.
>
> So I'm curious if we can support notifyOnMaster for
> notifyCheckpointComplete like FinalizeOnMaster.
>
> What do you think?
>
> [1]
> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
>
> Best,
> Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

Jark Wu-2
I think before we have such interface, maybe we can make task-0 to do the global finalize work.


Best,
Jark


> 在 2019年9月6日,13:39,shimin yang <[hidden email]> 写道:
>
> Hi Jingsong,
>
> Big fan of this idea. We faced the same problem and resolved by adding a
> distributed lock. It would be nice to have this feature in JobMaster, which
> can replace the lock.
>
> Best,
> Shimin
>
> JingsongLee <[hidden email]> 于2019年9月6日周五 下午12:20写道:
>
>> Hi devs:
>>
>> I try to implement streaming file sink for table[1] like StreamingFileSink.
>> If the underlying is a HiveFormat, or a format that updates visibility
>> through a metaStore, I have to update the metaStore in the
>> notifyCheckpointComplete, but this operation occurs on the task side,
>> which will lead to distributed access to the metaStore, which will
>> lead to bottleneck.
>>
>> So I'm curious if we can support notifyOnMaster for
>> notifyCheckpointComplete like FinalizeOnMaster.
>>
>> What do you think?
>>
>> [1]
>> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
>>
>> Best,
>> Jingsong Lee

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

Dian Fu-2
In reply to this post by shimin yang
Hi Jingsong,

Thanks for bring up this discussion. You can try to look at the GlobalAggregateManager to see if it can meet your requirements. It can be got via StreamingRuntimeContext#getGlobalAggregateManager().

Regards,
Dian

> 在 2019年9月6日,下午1:39,shimin yang <[hidden email]> 写道:
>
> Hi Jingsong,
>
> Big fan of this idea. We faced the same problem and resolved by adding a
> distributed lock. It would be nice to have this feature in JobMaster, which
> can replace the lock.
>
> Best,
> Shimin
>
> JingsongLee <[hidden email]> 于2019年9月6日周五 下午12:20写道:
>
>> Hi devs:
>>
>> I try to implement streaming file sink for table[1] like StreamingFileSink.
>> If the underlying is a HiveFormat, or a format that updates visibility
>> through a metaStore, I have to update the metaStore in the
>> notifyCheckpointComplete, but this operation occurs on the task side,
>> which will lead to distributed access to the metaStore, which will
>> lead to bottleneck.
>>
>> So I'm curious if we can support notifyOnMaster for
>> notifyCheckpointComplete like FinalizeOnMaster.
>>
>> What do you think?
>>
>> [1]
>> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
>>
>> Best,
>> Jingsong Lee

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

shimin yang
Hi Fu,

Thank you for the remind. I think it would work in my case as long as it's
an atomic operation.

Dian Fu <[hidden email]> 于2019年9月6日周五 下午2:22写道:

> Hi Jingsong,
>
> Thanks for bring up this discussion. You can try to look at the
> GlobalAggregateManager to see if it can meet your requirements. It can be
> got via StreamingRuntimeContext#getGlobalAggregateManager().
>
> Regards,
> Dian
>
> > 在 2019年9月6日,下午1:39,shimin yang <[hidden email]> 写道:
> >
> > Hi Jingsong,
> >
> > Big fan of this idea. We faced the same problem and resolved by adding a
> > distributed lock. It would be nice to have this feature in JobMaster,
> which
> > can replace the lock.
> >
> > Best,
> > Shimin
> >
> > JingsongLee <[hidden email]> 于2019年9月6日周五 下午12:20写道:
> >
> >> Hi devs:
> >>
> >> I try to implement streaming file sink for table[1] like
> StreamingFileSink.
> >> If the underlying is a HiveFormat, or a format that updates visibility
> >> through a metaStore, I have to update the metaStore in the
> >> notifyCheckpointComplete, but this operation occurs on the task side,
> >> which will lead to distributed access to the metaStore, which will
> >> lead to bottleneck.
> >>
> >> So I'm curious if we can support notifyOnMaster for
> >> notifyCheckpointComplete like FinalizeOnMaster.
> >>
> >> What do you think?
> >>
> >> [1]
> >>
> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
> >>
> >> Best,
> >> Jingsong Lee
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

Dian Fu-2
Hi Shimin,

It can be guaranteed to be an atomic operation. This is ensured by the RPC framework. You could take a look at RpcEndpoint for more details.

Regards,
Dian

> 在 2019年9月6日,下午2:35,shimin yang <[hidden email]> 写道:
>
> Hi Fu,
>
> Thank you for the remind. I think it would work in my case as long as it's
> an atomic operation.
>
> Dian Fu <[hidden email]> 于2019年9月6日周五 下午2:22写道:
>
>> Hi Jingsong,
>>
>> Thanks for bring up this discussion. You can try to look at the
>> GlobalAggregateManager to see if it can meet your requirements. It can be
>> got via StreamingRuntimeContext#getGlobalAggregateManager().
>>
>> Regards,
>> Dian
>>
>>> 在 2019年9月6日,下午1:39,shimin yang <[hidden email]> 写道:
>>>
>>> Hi Jingsong,
>>>
>>> Big fan of this idea. We faced the same problem and resolved by adding a
>>> distributed lock. It would be nice to have this feature in JobMaster,
>> which
>>> can replace the lock.
>>>
>>> Best,
>>> Shimin
>>>
>>> JingsongLee <[hidden email]> 于2019年9月6日周五 下午12:20写道:
>>>
>>>> Hi devs:
>>>>
>>>> I try to implement streaming file sink for table[1] like
>> StreamingFileSink.
>>>> If the underlying is a HiveFormat, or a format that updates visibility
>>>> through a metaStore, I have to update the metaStore in the
>>>> notifyCheckpointComplete, but this operation occurs on the task side,
>>>> which will lead to distributed access to the metaStore, which will
>>>> lead to bottleneck.
>>>>
>>>> So I'm curious if we can support notifyOnMaster for
>>>> notifyCheckpointComplete like FinalizeOnMaster.
>>>>
>>>> What do you think?
>>>>
>>>> [1]
>>>>
>> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
>>>>
>>>> Best,
>>>> Jingsong Lee
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

shimin yang
Hi Fu,

That'll be nice.

Thanks.

Best,
Shimin

Dian Fu <[hidden email]> 于2019年9月6日周五 下午3:17写道:

> Hi Shimin,
>
> It can be guaranteed to be an atomic operation. This is ensured by the RPC
> framework. You could take a look at RpcEndpoint for more details.
>
> Regards,
> Dian
>
> > 在 2019年9月6日,下午2:35,shimin yang <[hidden email]> 写道:
> >
> > Hi Fu,
> >
> > Thank you for the remind. I think it would work in my case as long as
> it's
> > an atomic operation.
> >
> > Dian Fu <[hidden email]> 于2019年9月6日周五 下午2:22写道:
> >
> >> Hi Jingsong,
> >>
> >> Thanks for bring up this discussion. You can try to look at the
> >> GlobalAggregateManager to see if it can meet your requirements. It can
> be
> >> got via StreamingRuntimeContext#getGlobalAggregateManager().
> >>
> >> Regards,
> >> Dian
> >>
> >>> 在 2019年9月6日,下午1:39,shimin yang <[hidden email]> 写道:
> >>>
> >>> Hi Jingsong,
> >>>
> >>> Big fan of this idea. We faced the same problem and resolved by adding
> a
> >>> distributed lock. It would be nice to have this feature in JobMaster,
> >> which
> >>> can replace the lock.
> >>>
> >>> Best,
> >>> Shimin
> >>>
> >>> JingsongLee <[hidden email]> 于2019年9月6日周五 下午12:20写道:
> >>>
> >>>> Hi devs:
> >>>>
> >>>> I try to implement streaming file sink for table[1] like
> >> StreamingFileSink.
> >>>> If the underlying is a HiveFormat, or a format that updates visibility
> >>>> through a metaStore, I have to update the metaStore in the
> >>>> notifyCheckpointComplete, but this operation occurs on the task side,
> >>>> which will lead to distributed access to the metaStore, which will
> >>>> lead to bottleneck.
> >>>>
> >>>> So I'm curious if we can support notifyOnMaster for
> >>>> notifyCheckpointComplete like FinalizeOnMaster.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> [1]
> >>>>
> >>
> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
> >>>>
> >>>> Best,
> >>>> Jingsong Lee
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

JingsongLee-2
Thanks jark and dian:
1.jark's approach: do the work in task-0. Simple way.
2.dian's approach: use StreamingRuntimeContext#getGlobalAggregateManager Can do more operation. But these accumulators are not fault-tolerant?

Best,
Jingsong Lee


------------------------------------------------------------------
From:shimin yang <[hidden email]>
Send Time:2019年9月6日(星期五) 15:21
To:dev <[hidden email]>
Subject:Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

Hi Fu,

That'll be nice.

Thanks.

Best,
Shimin

Dian Fu <[hidden email]> 于2019年9月6日周五 下午3:17写道:

> Hi Shimin,
>
> It can be guaranteed to be an atomic operation. This is ensured by the RPC
> framework. You could take a look at RpcEndpoint for more details.
>
> Regards,
> Dian
>
> > 在 2019年9月6日,下午2:35,shimin yang <[hidden email]> 写道:
> >
> > Hi Fu,
> >
> > Thank you for the remind. I think it would work in my case as long as
> it's
> > an atomic operation.
> >
> > Dian Fu <[hidden email]> 于2019年9月6日周五 下午2:22写道:
> >
> >> Hi Jingsong,
> >>
> >> Thanks for bring up this discussion. You can try to look at the
> >> GlobalAggregateManager to see if it can meet your requirements. It can
> be
> >> got via StreamingRuntimeContext#getGlobalAggregateManager().
> >>
> >> Regards,
> >> Dian
> >>
> >>> 在 2019年9月6日,下午1:39,shimin yang <[hidden email]> 写道:
> >>>
> >>> Hi Jingsong,
> >>>
> >>> Big fan of this idea. We faced the same problem and resolved by adding
> a
> >>> distributed lock. It would be nice to have this feature in JobMaster,
> >> which
> >>> can replace the lock.
> >>>
> >>> Best,
> >>> Shimin
> >>>
> >>> JingsongLee <[hidden email]> 于2019年9月6日周五 下午12:20写道:
> >>>
> >>>> Hi devs:
> >>>>
> >>>> I try to implement streaming file sink for table[1] like
> >> StreamingFileSink.
> >>>> If the underlying is a HiveFormat, or a format that updates visibility
> >>>> through a metaStore, I have to update the metaStore in the
> >>>> notifyCheckpointComplete, but this operation occurs on the task side,
> >>>> which will lead to distributed access to the metaStore, which will
> >>>> lead to bottleneck.
> >>>>
> >>>> So I'm curious if we can support notifyOnMaster for
> >>>> notifyCheckpointComplete like FinalizeOnMaster.
> >>>>
> >>>> What do you think?
> >>>>
> >>>> [1]
> >>>>
> >>
> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
> >>>>
> >>>> Best,
> >>>> Jingsong Lee
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

shimin yang
Hi Jingsong,

Although it would be nice if the accumulators in GlobalAggregateManager is
fault-tolerant, we could still take advantage of managed state to guarantee
the semantic and use the accumulators to implement distributed barrier or
lock to solve the distributed access problem.

Best,
Shimin

JingsongLee <[hidden email]> 于2019年9月9日周一 下午1:33写道:

> Thanks jark and dian:
> 1.jark's approach: do the work in task-0. Simple way.
> 2.dian's approach: use StreamingRuntimeContext#getGlobalAggregateManager
> Can do more operation. But these accumulators are not fault-tolerant?
>
> Best,
> Jingsong Lee
>
>
> ------------------------------------------------------------------
> From:shimin yang <[hidden email]>
> Send Time:2019年9月6日(星期五) 15:21
> To:dev <[hidden email]>
> Subject:Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete
>
> Hi Fu,
>
> That'll be nice.
>
> Thanks.
>
> Best,
> Shimin
>
> Dian Fu <[hidden email]> 于2019年9月6日周五 下午3:17写道:
>
> > Hi Shimin,
> >
> > It can be guaranteed to be an atomic operation. This is ensured by the
> RPC
> > framework. You could take a look at RpcEndpoint for more details.
> >
> > Regards,
> > Dian
> >
> > > 在 2019年9月6日,下午2:35,shimin yang <[hidden email]> 写道:
> > >
> > > Hi Fu,
> > >
> > > Thank you for the remind. I think it would work in my case as long as
> > it's
> > > an atomic operation.
> > >
> > > Dian Fu <[hidden email]> 于2019年9月6日周五 下午2:22写道:
> > >
> > >> Hi Jingsong,
> > >>
> > >> Thanks for bring up this discussion. You can try to look at the
> > >> GlobalAggregateManager to see if it can meet your requirements. It can
> > be
> > >> got via StreamingRuntimeContext#getGlobalAggregateManager().
> > >>
> > >> Regards,
> > >> Dian
> > >>
> > >>> 在 2019年9月6日,下午1:39,shimin yang <[hidden email]> 写道:
> > >>>
> > >>> Hi Jingsong,
> > >>>
> > >>> Big fan of this idea. We faced the same problem and resolved by
> adding
> > a
> > >>> distributed lock. It would be nice to have this feature in JobMaster,
> > >> which
> > >>> can replace the lock.
> > >>>
> > >>> Best,
> > >>> Shimin
> > >>>
> > >>> JingsongLee <[hidden email]> 于2019年9月6日周五
> 下午12:20写道:
> > >>>
> > >>>> Hi devs:
> > >>>>
> > >>>> I try to implement streaming file sink for table[1] like
> > >> StreamingFileSink.
> > >>>> If the underlying is a HiveFormat, or a format that updates
> visibility
> > >>>> through a metaStore, I have to update the metaStore in the
> > >>>> notifyCheckpointComplete, but this operation occurs on the task
> side,
> > >>>> which will lead to distributed access to the metaStore, which will
> > >>>> lead to bottleneck.
> > >>>>
> > >>>> So I'm curious if we can support notifyOnMaster for
> > >>>> notifyCheckpointComplete like FinalizeOnMaster.
> > >>>>
> > >>>> What do you think?
> > >>>>
> > >>>> [1]
> > >>>>
> > >>
> >
> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
> > >>>>
> > >>>> Best,
> > >>>> Jingsong Lee
> > >>
> > >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

Dian Fu-2
Hi Jingsong,

Good point!

1. If it doesn't matter which task performs the finalize work, then I think task-0 suggested by Jark is a very good solution.
2. If it requires the last finished task to perform the finalize work, then we have to consider other solutions.
  WRT fault-tolerant of StreamingRuntimeContext#getGlobalAggregateManager, AFAIK, there is no built-in support.
    1) Regarding to TM failover, I think it's not a problem. We can use an accumulator i.e. finish_count and it is increased by 1 when a sub-task is finished(i.e. close() method is called).
       When finish_count == RuntimeContext.getNumberOfParallelSubtasks() for some sub-task, then we can know that it's the last finished sub-task. This holds true even in case of TM failover.
    2) Regarding to JM failover, I have no idea how to work around it so far. Maybe @Jamie Grier who is the author of this feature could share more thoughts. Not sure if there is already solution/plan to support JM failover or this feature is not designed for this kind of use case?

Regards,
Dian

> 在 2019年9月9日,下午3:08,shimin yang <[hidden email]> 写道:
>
> Hi Jingsong,
>
> Although it would be nice if the accumulators in GlobalAggregateManager is
> fault-tolerant, we could still take advantage of managed state to guarantee
> the semantic and use the accumulators to implement distributed barrier or
> lock to solve the distributed access problem.
>
> Best,
> Shimin
>
> JingsongLee <[hidden email]> 于2019年9月9日周一 下午1:33写道:
>
>> Thanks jark and dian:
>> 1.jark's approach: do the work in task-0. Simple way.
>> 2.dian's approach: use StreamingRuntimeContext#getGlobalAggregateManager
>> Can do more operation. But these accumulators are not fault-tolerant?
>>
>> Best,
>> Jingsong Lee
>>
>>
>> ------------------------------------------------------------------
>> From:shimin yang <[hidden email]>
>> Send Time:2019年9月6日(星期五) 15:21
>> To:dev <[hidden email]>
>> Subject:Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete
>>
>> Hi Fu,
>>
>> That'll be nice.
>>
>> Thanks.
>>
>> Best,
>> Shimin
>>
>> Dian Fu <[hidden email]> 于2019年9月6日周五 下午3:17写道:
>>
>>> Hi Shimin,
>>>
>>> It can be guaranteed to be an atomic operation. This is ensured by the
>> RPC
>>> framework. You could take a look at RpcEndpoint for more details.
>>>
>>> Regards,
>>> Dian
>>>
>>>> 在 2019年9月6日,下午2:35,shimin yang <[hidden email]> 写道:
>>>>
>>>> Hi Fu,
>>>>
>>>> Thank you for the remind. I think it would work in my case as long as
>>> it's
>>>> an atomic operation.
>>>>
>>>> Dian Fu <[hidden email]> 于2019年9月6日周五 下午2:22写道:
>>>>
>>>>> Hi Jingsong,
>>>>>
>>>>> Thanks for bring up this discussion. You can try to look at the
>>>>> GlobalAggregateManager to see if it can meet your requirements. It can
>>> be
>>>>> got via StreamingRuntimeContext#getGlobalAggregateManager().
>>>>>
>>>>> Regards,
>>>>> Dian
>>>>>
>>>>>> 在 2019年9月6日,下午1:39,shimin yang <[hidden email]> 写道:
>>>>>>
>>>>>> Hi Jingsong,
>>>>>>
>>>>>> Big fan of this idea. We faced the same problem and resolved by
>> adding
>>> a
>>>>>> distributed lock. It would be nice to have this feature in JobMaster,
>>>>> which
>>>>>> can replace the lock.
>>>>>>
>>>>>> Best,
>>>>>> Shimin
>>>>>>
>>>>>> JingsongLee <[hidden email]> 于2019年9月6日周五
>> 下午12:20写道:
>>>>>>
>>>>>>> Hi devs:
>>>>>>>
>>>>>>> I try to implement streaming file sink for table[1] like
>>>>> StreamingFileSink.
>>>>>>> If the underlying is a HiveFormat, or a format that updates
>> visibility
>>>>>>> through a metaStore, I have to update the metaStore in the
>>>>>>> notifyCheckpointComplete, but this operation occurs on the task
>> side,
>>>>>>> which will lead to distributed access to the metaStore, which will
>>>>>>> lead to bottleneck.
>>>>>>>
>>>>>>> So I'm curious if we can support notifyOnMaster for
>>>>>>> notifyCheckpointComplete like FinalizeOnMaster.
>>>>>>>
>>>>>>> What do you think?
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>
>>>
>> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
>>>>>>>
>>>>>>> Best,
>>>>>>> Jingsong Lee
>>>>>
>>>>>
>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

Stephan Ewen
Hi all!

I think it would be time to rethink the Sink API as a whole, like we did
with the Source API in FLIP-27.
It would be nice to have proper design that handles all this consistently,
rather than adding one more hook.

For example:
  - For batch, you can already use the existing "finalize on master" hook
  - For streaming, it is tricky to "commit on end-of-stream" reliably
(tolerating failures)
  - write ahead versus direct writing
  - transactional versus idempotent
  - temp files and renaming versus recoverable writer
==> All these things have special cases currently, rather than a coherent
design.

Best,
Stephan


On Tue, Sep 10, 2019 at 6:40 AM Dian Fu <[hidden email]> wrote:

> Hi Jingsong,
>
> Good point!
>
> 1. If it doesn't matter which task performs the finalize work, then I
> think task-0 suggested by Jark is a very good solution.
> 2. If it requires the last finished task to perform the finalize work,
> then we have to consider other solutions.
>   WRT fault-tolerant of StreamingRuntimeContext#getGlobalAggregateManager,
> AFAIK, there is no built-in support.
>     1) Regarding to TM failover, I think it's not a problem. We can use an
> accumulator i.e. finish_count and it is increased by 1 when a sub-task is
> finished(i.e. close() method is called).
>        When finish_count == RuntimeContext.getNumberOfParallelSubtasks()
> for some sub-task, then we can know that it's the last finished sub-task.
> This holds true even in case of TM failover.
>     2) Regarding to JM failover, I have no idea how to work around it so
> far. Maybe @Jamie Grier who is the author of this feature could share more
> thoughts. Not sure if there is already solution/plan to support JM failover
> or this feature is not designed for this kind of use case?
>
> Regards,
> Dian
>
> > 在 2019年9月9日,下午3:08,shimin yang <[hidden email]> 写道:
> >
> > Hi Jingsong,
> >
> > Although it would be nice if the accumulators in GlobalAggregateManager
> is
> > fault-tolerant, we could still take advantage of managed state to
> guarantee
> > the semantic and use the accumulators to implement distributed barrier or
> > lock to solve the distributed access problem.
> >
> > Best,
> > Shimin
> >
> > JingsongLee <[hidden email]> 于2019年9月9日周一 下午1:33写道:
> >
> >> Thanks jark and dian:
> >> 1.jark's approach: do the work in task-0. Simple way.
> >> 2.dian's approach: use StreamingRuntimeContext#getGlobalAggregateManager
> >> Can do more operation. But these accumulators are not fault-tolerant?
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >>
> >> ------------------------------------------------------------------
> >> From:shimin yang <[hidden email]>
> >> Send Time:2019年9月6日(星期五) 15:21
> >> To:dev <[hidden email]>
> >> Subject:Re: [DISCUSS] Support notifyOnMaster for
> notifyCheckpointComplete
> >>
> >> Hi Fu,
> >>
> >> That'll be nice.
> >>
> >> Thanks.
> >>
> >> Best,
> >> Shimin
> >>
> >> Dian Fu <[hidden email]> 于2019年9月6日周五 下午3:17写道:
> >>
> >>> Hi Shimin,
> >>>
> >>> It can be guaranteed to be an atomic operation. This is ensured by the
> >> RPC
> >>> framework. You could take a look at RpcEndpoint for more details.
> >>>
> >>> Regards,
> >>> Dian
> >>>
> >>>> 在 2019年9月6日,下午2:35,shimin yang <[hidden email]> 写道:
> >>>>
> >>>> Hi Fu,
> >>>>
> >>>> Thank you for the remind. I think it would work in my case as long as
> >>> it's
> >>>> an atomic operation.
> >>>>
> >>>> Dian Fu <[hidden email]> 于2019年9月6日周五 下午2:22写道:
> >>>>
> >>>>> Hi Jingsong,
> >>>>>
> >>>>> Thanks for bring up this discussion. You can try to look at the
> >>>>> GlobalAggregateManager to see if it can meet your requirements. It
> can
> >>> be
> >>>>> got via StreamingRuntimeContext#getGlobalAggregateManager().
> >>>>>
> >>>>> Regards,
> >>>>> Dian
> >>>>>
> >>>>>> 在 2019年9月6日,下午1:39,shimin yang <[hidden email]> 写道:
> >>>>>>
> >>>>>> Hi Jingsong,
> >>>>>>
> >>>>>> Big fan of this idea. We faced the same problem and resolved by
> >> adding
> >>> a
> >>>>>> distributed lock. It would be nice to have this feature in
> JobMaster,
> >>>>> which
> >>>>>> can replace the lock.
> >>>>>>
> >>>>>> Best,
> >>>>>> Shimin
> >>>>>>
> >>>>>> JingsongLee <[hidden email]> 于2019年9月6日周五
> >> 下午12:20写道:
> >>>>>>
> >>>>>>> Hi devs:
> >>>>>>>
> >>>>>>> I try to implement streaming file sink for table[1] like
> >>>>> StreamingFileSink.
> >>>>>>> If the underlying is a HiveFormat, or a format that updates
> >> visibility
> >>>>>>> through a metaStore, I have to update the metaStore in the
> >>>>>>> notifyCheckpointComplete, but this operation occurs on the task
> >> side,
> >>>>>>> which will lead to distributed access to the metaStore, which will
> >>>>>>> lead to bottleneck.
> >>>>>>>
> >>>>>>> So I'm curious if we can support notifyOnMaster for
> >>>>>>> notifyCheckpointComplete like FinalizeOnMaster.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>
> >>>
> >>
> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jingsong Lee
> >>>>>
> >>>>>
> >>>
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

JingsongLee-2
+1 to rethink

Best,
Jingsong Lee


------------------------------------------------------------------
From:Stephan Ewen <[hidden email]>
Send Time:2019年9月10日(星期二) 15:40
To:dev <[hidden email]>
Cc:JingsongLee <[hidden email]>
Subject:Re: [DISCUSS] Support notifyOnMaster for notifyCheckpointComplete

Hi all!

I think it would be time to rethink the Sink API as a whole, like we did
with the Source API in FLIP-27.
It would be nice to have proper design that handles all this consistently,
rather than adding one more hook.

For example:
  - For batch, you can already use the existing "finalize on master" hook
  - For streaming, it is tricky to "commit on end-of-stream" reliably
(tolerating failures)
  - write ahead versus direct writing
  - transactional versus idempotent
  - temp files and renaming versus recoverable writer
==> All these things have special cases currently, rather than a coherent
design.

Best,
Stephan


On Tue, Sep 10, 2019 at 6:40 AM Dian Fu <[hidden email]> wrote:

> Hi Jingsong,
>
> Good point!
>
> 1. If it doesn't matter which task performs the finalize work, then I
> think task-0 suggested by Jark is a very good solution.
> 2. If it requires the last finished task to perform the finalize work,
> then we have to consider other solutions.
>   WRT fault-tolerant of StreamingRuntimeContext#getGlobalAggregateManager,
> AFAIK, there is no built-in support.
>     1) Regarding to TM failover, I think it's not a problem. We can use an
> accumulator i.e. finish_count and it is increased by 1 when a sub-task is
> finished(i.e. close() method is called).
>        When finish_count == RuntimeContext.getNumberOfParallelSubtasks()
> for some sub-task, then we can know that it's the last finished sub-task.
> This holds true even in case of TM failover.
>     2) Regarding to JM failover, I have no idea how to work around it so
> far. Maybe @Jamie Grier who is the author of this feature could share more
> thoughts. Not sure if there is already solution/plan to support JM failover
> or this feature is not designed for this kind of use case?
>
> Regards,
> Dian
>
> > 在 2019年9月9日,下午3:08,shimin yang <[hidden email]> 写道:
> >
> > Hi Jingsong,
> >
> > Although it would be nice if the accumulators in GlobalAggregateManager
> is
> > fault-tolerant, we could still take advantage of managed state to
> guarantee
> > the semantic and use the accumulators to implement distributed barrier or
> > lock to solve the distributed access problem.
> >
> > Best,
> > Shimin
> >
> > JingsongLee <[hidden email]> 于2019年9月9日周一 下午1:33写道:
> >
> >> Thanks jark and dian:
> >> 1.jark's approach: do the work in task-0. Simple way.
> >> 2.dian's approach: use StreamingRuntimeContext#getGlobalAggregateManager
> >> Can do more operation. But these accumulators are not fault-tolerant?
> >>
> >> Best,
> >> Jingsong Lee
> >>
> >>
> >> ------------------------------------------------------------------
> >> From:shimin yang <[hidden email]>
> >> Send Time:2019年9月6日(星期五) 15:21
> >> To:dev <[hidden email]>
> >> Subject:Re: [DISCUSS] Support notifyOnMaster for
> notifyCheckpointComplete
> >>
> >> Hi Fu,
> >>
> >> That'll be nice.
> >>
> >> Thanks.
> >>
> >> Best,
> >> Shimin
> >>
> >> Dian Fu <[hidden email]> 于2019年9月6日周五 下午3:17写道:
> >>
> >>> Hi Shimin,
> >>>
> >>> It can be guaranteed to be an atomic operation. This is ensured by the
> >> RPC
> >>> framework. You could take a look at RpcEndpoint for more details.
> >>>
> >>> Regards,
> >>> Dian
> >>>
> >>>> 在 2019年9月6日,下午2:35,shimin yang <[hidden email]> 写道:
> >>>>
> >>>> Hi Fu,
> >>>>
> >>>> Thank you for the remind. I think it would work in my case as long as
> >>> it's
> >>>> an atomic operation.
> >>>>
> >>>> Dian Fu <[hidden email]> 于2019年9月6日周五 下午2:22写道:
> >>>>
> >>>>> Hi Jingsong,
> >>>>>
> >>>>> Thanks for bring up this discussion. You can try to look at the
> >>>>> GlobalAggregateManager to see if it can meet your requirements. It
> can
> >>> be
> >>>>> got via StreamingRuntimeContext#getGlobalAggregateManager().
> >>>>>
> >>>>> Regards,
> >>>>> Dian
> >>>>>
> >>>>>> 在 2019年9月6日,下午1:39,shimin yang <[hidden email]> 写道:
> >>>>>>
> >>>>>> Hi Jingsong,
> >>>>>>
> >>>>>> Big fan of this idea. We faced the same problem and resolved by
> >> adding
> >>> a
> >>>>>> distributed lock. It would be nice to have this feature in
> JobMaster,
> >>>>> which
> >>>>>> can replace the lock.
> >>>>>>
> >>>>>> Best,
> >>>>>> Shimin
> >>>>>>
> >>>>>> JingsongLee <[hidden email]> 于2019年9月6日周五
> >> 下午12:20写道:
> >>>>>>
> >>>>>>> Hi devs:
> >>>>>>>
> >>>>>>> I try to implement streaming file sink for table[1] like
> >>>>> StreamingFileSink.
> >>>>>>> If the underlying is a HiveFormat, or a format that updates
> >> visibility
> >>>>>>> through a metaStore, I have to update the metaStore in the
> >>>>>>> notifyCheckpointComplete, but this operation occurs on the task
> >> side,
> >>>>>>> which will lead to distributed access to the metaStore, which will
> >>>>>>> lead to bottleneck.
> >>>>>>>
> >>>>>>> So I'm curious if we can support notifyOnMaster for
> >>>>>>> notifyCheckpointComplete like FinalizeOnMaster.
> >>>>>>>
> >>>>>>> What do you think?
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>
> >>>
> >>
> https://docs.google.com/document/d/15R3vZ1R_pAHcvJkRx_CWleXgl08WL3k_ZpnWSdzP7GY/edit?usp=sharing
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Jingsong Lee
> >>>>>
> >>>>>
> >>>
> >>>
> >>
>
>