Sharing state between subtasks

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Till Rohrmann
Hi Thomas,

using Akka directly would further manifest our dependency on Scala in
flink-runtime. This is something we are currently trying to get rid of. For
that purpose we have added the RpcService abstraction which encapsulates
all Akka specific logic. We hope that we can soon get rid of the Scala
dependency in flink-runtime by using a special class loader only for
loading the AkkaRpcService implementation.

I think the easiest way to sync the task information is actually going
through the JobMaster because the subtasks don't know on which other TMs
the other subtasks run. Otherwise, we would need to have some TM detection
mechanism between TMs. If you choose this way, then you should be able to
use the RpcService by extending the JobMasterGateway by additional RPCs.

Cheers,
Till

On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise <[hidden email]> wrote:

> Hi,
>
> We are planning to work on the Kinesis consumer in the following order:
>
> 1. Add per shard watermarking:
> https://issues.apache.org/jira/browse/FLINK-5697 - this will be code we
> already use internally; I will open a PR to add it to the Flink Kinesis
> consumer
> 2. Exchange of per subtask watermarks between all subtasks of one or
> multiple sources
> 3. Implement queue approach described in Jamie's document in to utilize 1.)
> and 2.) to align the shard consumers WRT event time
>
> There was some discussion regarding the mechanism to share the watermarks
> between subtasks. If there is something that can be re-used it would be
> great. Otherwise I'm going to further investigate the Akka or JGroups
> route. Regarding Akka, since it is used within Flink already, is there an
> abstraction that you would recommend to consider to avoid direct
> dependency?
>
> Thanks,
> Thomas
>
>
>
> On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> <[hidden email]> wrote:
>
> > Not yet. We only have some initial thoughts and have not worked on it
> yet.
> > We will update the progress in this discussion if have.
> >
> > Best,
> > Zhijiang
> > ------------------------------------------------------------------
> > 发件人:Aljoscha Krettek <[hidden email]>
> > 发送时间:2018年10月18日(星期四) 17:53
> > 收件人:dev <[hidden email]>; Zhijiang(wangzhijiang999) <
> > [hidden email]>
> > 抄 送:Till Rohrmann <[hidden email]>
> > 主 题:Re: Sharing state between subtasks
> >
> > Hi Zhijiang,
> >
> > do you already have working code or a design doc for the second approach?
> >
> > Best,
> > Aljoscha
> >
> > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > [hidden email]> wrote:
> > >
> > > Just noticed this discussion from @Till Rohrmann's weekly community
> > update and I want to share some thoughts from our experiences.
> > >
> > > We also encountered the source consuption skew issue before, and we are
> > focused on improving this by two possible ways.
> > >
> > > 1. Control the read strategy by the downstream side. In detail, every
> > input channel in downstream task corresponds to the consumption of one
> > upstream source task, and we will tag each input channel with watermark
> to
> > find the lowest channel to read in high priority. In essence, we actually
> > rely on the mechanism of backpressure. If the channel with highest
> > timestamp is not read by downstream task for a while, it will block the
> > corresponding source task to read when the buffers are exhausted. It is
> no
> > need to change the source interface in this way, but there are two major
> > concerns: first it will affect the barier alignment resulting in
> checkpoint
> > delayed or expired. Second it can not confirm source consumption
> alignment
> > very precisely, and it is just a best effort way. So we gave up this way
> > finally.
> > >
> > > 2. Add the new component of SourceCoordinator to coordinate the source
> > consumption distributedly. For example we can start this componnet in the
> > JobManager like the current role of CheckpointCoordinator. Then every
> > source task would commnicate with JobManager via current RPC mechanism,
> > maybe we can rely on the heartbeat message to attach the consumption
> > progress as the payloads. The JobManagerwill accumulator or state all the
> > reported progress and then give responses for different source tasks. We
> > can define a protocol for indicating the fast soruce task to sleep for
> > specific time for example. To do so, the coordinator has the global
> > informations to give the proper decision for individuals, so it seems
> more
> > precise. And it will not affect the barrier alignment, because the
> sleeping
> > fast source can release the lock to emit barrier as normal. The only
> > concern is the changes for source interface and may affect all related
> > source implementations.
> > >
> > > Currently we prefer to the second way to implement and will refer to
> > other good points above. :)
> > >
> > > Best,
> > > Zhijiang
> > > ------------------------------------------------------------------
> > > 发件人:Jamie Grier <[hidden email]>
> > > 发送时间:2018年10月17日(星期三) 03:28
> > > 收件人:dev <[hidden email]>
> > > 主 题:Re: Sharing state between subtasks
> > >
> > > Here's a doc I started describing some changes we would like to make
> > > starting with the Kinesis Source.. It describes a refactoring of that
> > code
> > > specifically and also hopefully a pattern and some reusable code we can
> > use
> > > in the other sources as well.  The end goal would be best-effort
> > event-time
> > > synchronization across all Flink sources but we are going to start with
> > the
> > > Kinesis Source first.
> > >
> > > Please take a look and please provide thoughts and opinions about the
> > best
> > > state sharing mechanism to use -- that section is left blank and we're
> > > especially looking for input there.
> > >
> > >
> >
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> > >
> > > -Jamie
> > >
> > >
> > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <[hidden email]>
> > wrote:
> > >
> > >> But on the Kafka source level it should be perfectly fine to do what
> > Elias
> > >> proposed. This is of course is not the perfect solution but could
> bring
> > us
> > >> forward quite a bit. The changes required for this should also be
> > minimal.
> > >> This would become obsolete once we have something like shared state.
> But
> > >> until then, I think it would worth a try.
> > >>
> > >> Cheers,
> > >> Till
> > >>
> > >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <[hidden email]
> >
> > >> wrote:
> > >>
> > >>> The reason this selective reading doesn't work well in Flink in the
> > >> moment
> > >>> is because of checkpointing. For checkpointing, checkpoint barriers
> > >> travel
> > >>> within the streams. If we selectively read from inputs based on
> > >> timestamps
> > >>> this is akin to blocking an input if that input is very far ahead in
> > >> event
> > >>> time, which can happen when you have a very fast source and a slow
> > source
> > >>> (in event time), maybe because you're in a catchup phase. In those
> > cases
> > >>> it's better to simply not read the data at the sources, as Thomas
> said.
> > >>> This is also because with Kafka Streams, each operator is basically
> its
> > >> own
> > >>> job: it's reading from Kafka and writing to Kafka and there is not a
> > >>> complex graph of different operations with network shuffles in
> between,
> > >> as
> > >>> you have with Flink.
> > >>>
> > >>> This different nature of Flink is also why I think that readers need
> > >>> awareness of other readers to do the event-time alignment, and this
> is
> > >>> where shared state comes in.
> > >>>
> > >>>> On 10. Oct 2018, at 20:47, Elias Levy <[hidden email]>
> > >>> wrote:
> > >>>>
> > >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <[hidden email]>
> > >> wrote:
> > >>>>
> > >>>>> I think the new source interface would be designed to be able to
> > >>> leverage
> > >>>>> shared state to achieve time alignment.
> > >>>>> I don't think this would be possible without some kind of shared
> > >> state.
> > >>>>>
> > >>>>> The problem of tasks that are far ahead in time cannot be solved
> with
> > >>>>> back-pressure.
> > >>>>> That's because a task cannot choose from which source task it
> accepts
> > >>>>> events and from which doesn't.
> > >>>>> If it blocks an input, all downstream tasks that are connected to
> the
> > >>>>> operator are affected. This can easily lead to deadlocks.
> > >>>>> Therefore, all operators need to be able to handle events when they
> > >>> arrive.
> > >>>>> If they cannot process them yet because they are too far ahead in
> > >> time,
> > >>>>> they are put in state.
> > >>>>>
> > >>>>
> > >>>> The idea I was suggesting is not for operators to block an input.
> > >>> Rather,
> > >>>> it is that they selectively choose from which input to process the
> > next
> > >>>> message from based on their timestamp, so long as there are buffered
> > >>>> messages waiting to be processed.  That is a best-effort alignment
> > >>>> strategy.  Seems to work relatively well in practice, at least
> within
> > >>> Kafka
> > >>>> Streams.
> > >>>>
> > >>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
> > >> both
> > >>>> its inputs.  Instead, it could keep them separate and selectively
> > >> consume
> > >>>> from the one that had a buffer available, and if both have buffers
> > >>>> available, from the buffer with the messages with a lower timestamp.
> > >>>
> > >>>
> > >>
> > >
> >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Jamie Grier-3
Hey all,

I think all we need for this on the state sharing side is pretty simple.  I
opened a JIRA to track this work and submitted a PR for the state sharing
bit.

https://issues.apache.org/jira/browse/FLINK-10886
https://github.com/apache/flink/pull/7099

Please provide feedback :)

-Jamie


On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann <[hidden email]> wrote:

> Hi Thomas,
>
> using Akka directly would further manifest our dependency on Scala in
> flink-runtime. This is something we are currently trying to get rid of. For
> that purpose we have added the RpcService abstraction which encapsulates
> all Akka specific logic. We hope that we can soon get rid of the Scala
> dependency in flink-runtime by using a special class loader only for
> loading the AkkaRpcService implementation.
>
> I think the easiest way to sync the task information is actually going
> through the JobMaster because the subtasks don't know on which other TMs
> the other subtasks run. Otherwise, we would need to have some TM detection
> mechanism between TMs. If you choose this way, then you should be able to
> use the RpcService by extending the JobMasterGateway by additional RPCs.
>
> Cheers,
> Till
>
> On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise <[hidden email]> wrote:
>
> > Hi,
> >
> > We are planning to work on the Kinesis consumer in the following order:
> >
> > 1. Add per shard watermarking:
> > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code we
> > already use internally; I will open a PR to add it to the Flink Kinesis
> > consumer
> > 2. Exchange of per subtask watermarks between all subtasks of one or
> > multiple sources
> > 3. Implement queue approach described in Jamie's document in to utilize
> 1.)
> > and 2.) to align the shard consumers WRT event time
> >
> > There was some discussion regarding the mechanism to share the watermarks
> > between subtasks. If there is something that can be re-used it would be
> > great. Otherwise I'm going to further investigate the Akka or JGroups
> > route. Regarding Akka, since it is used within Flink already, is there an
> > abstraction that you would recommend to consider to avoid direct
> > dependency?
> >
> > Thanks,
> > Thomas
> >
> >
> >
> > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> > <[hidden email]> wrote:
> >
> > > Not yet. We only have some initial thoughts and have not worked on it
> > yet.
> > > We will update the progress in this discussion if have.
> > >
> > > Best,
> > > Zhijiang
> > > ------------------------------------------------------------------
> > > 发件人:Aljoscha Krettek <[hidden email]>
> > > 发送时间:2018年10月18日(星期四) 17:53
> > > 收件人:dev <[hidden email]>; Zhijiang(wangzhijiang999) <
> > > [hidden email]>
> > > 抄 送:Till Rohrmann <[hidden email]>
> > > 主 题:Re: Sharing state between subtasks
> > >
> > > Hi Zhijiang,
> > >
> > > do you already have working code or a design doc for the second
> approach?
> > >
> > > Best,
> > > Aljoscha
> > >
> > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > > [hidden email]> wrote:
> > > >
> > > > Just noticed this discussion from @Till Rohrmann's weekly community
> > > update and I want to share some thoughts from our experiences.
> > > >
> > > > We also encountered the source consuption skew issue before, and we
> are
> > > focused on improving this by two possible ways.
> > > >
> > > > 1. Control the read strategy by the downstream side. In detail, every
> > > input channel in downstream task corresponds to the consumption of one
> > > upstream source task, and we will tag each input channel with watermark
> > to
> > > find the lowest channel to read in high priority. In essence, we
> actually
> > > rely on the mechanism of backpressure. If the channel with highest
> > > timestamp is not read by downstream task for a while, it will block the
> > > corresponding source task to read when the buffers are exhausted. It is
> > no
> > > need to change the source interface in this way, but there are two
> major
> > > concerns: first it will affect the barier alignment resulting in
> > checkpoint
> > > delayed or expired. Second it can not confirm source consumption
> > alignment
> > > very precisely, and it is just a best effort way. So we gave up this
> way
> > > finally.
> > > >
> > > > 2. Add the new component of SourceCoordinator to coordinate the
> source
> > > consumption distributedly. For example we can start this componnet in
> the
> > > JobManager like the current role of CheckpointCoordinator. Then every
> > > source task would commnicate with JobManager via current RPC mechanism,
> > > maybe we can rely on the heartbeat message to attach the consumption
> > > progress as the payloads. The JobManagerwill accumulator or state all
> the
> > > reported progress and then give responses for different source tasks.
> We
> > > can define a protocol for indicating the fast soruce task to sleep for
> > > specific time for example. To do so, the coordinator has the global
> > > informations to give the proper decision for individuals, so it seems
> > more
> > > precise. And it will not affect the barrier alignment, because the
> > sleeping
> > > fast source can release the lock to emit barrier as normal. The only
> > > concern is the changes for source interface and may affect all related
> > > source implementations.
> > > >
> > > > Currently we prefer to the second way to implement and will refer to
> > > other good points above. :)
> > > >
> > > > Best,
> > > > Zhijiang
> > > > ------------------------------------------------------------------
> > > > 发件人:Jamie Grier <[hidden email]>
> > > > 发送时间:2018年10月17日(星期三) 03:28
> > > > 收件人:dev <[hidden email]>
> > > > 主 题:Re: Sharing state between subtasks
> > > >
> > > > Here's a doc I started describing some changes we would like to make
> > > > starting with the Kinesis Source.. It describes a refactoring of that
> > > code
> > > > specifically and also hopefully a pattern and some reusable code we
> can
> > > use
> > > > in the other sources as well.  The end goal would be best-effort
> > > event-time
> > > > synchronization across all Flink sources but we are going to start
> with
> > > the
> > > > Kinesis Source first.
> > > >
> > > > Please take a look and please provide thoughts and opinions about the
> > > best
> > > > state sharing mechanism to use -- that section is left blank and
> we're
> > > > especially looking for input there.
> > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> > > >
> > > > -Jamie
> > > >
> > > >
> > > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <[hidden email]
> >
> > > wrote:
> > > >
> > > >> But on the Kafka source level it should be perfectly fine to do what
> > > Elias
> > > >> proposed. This is of course is not the perfect solution but could
> > bring
> > > us
> > > >> forward quite a bit. The changes required for this should also be
> > > minimal.
> > > >> This would become obsolete once we have something like shared state.
> > But
> > > >> until then, I think it would worth a try.
> > > >>
> > > >> Cheers,
> > > >> Till
> > > >>
> > > >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <
> [hidden email]
> > >
> > > >> wrote:
> > > >>
> > > >>> The reason this selective reading doesn't work well in Flink in the
> > > >> moment
> > > >>> is because of checkpointing. For checkpointing, checkpoint barriers
> > > >> travel
> > > >>> within the streams. If we selectively read from inputs based on
> > > >> timestamps
> > > >>> this is akin to blocking an input if that input is very far ahead
> in
> > > >> event
> > > >>> time, which can happen when you have a very fast source and a slow
> > > source
> > > >>> (in event time), maybe because you're in a catchup phase. In those
> > > cases
> > > >>> it's better to simply not read the data at the sources, as Thomas
> > said.
> > > >>> This is also because with Kafka Streams, each operator is basically
> > its
> > > >> own
> > > >>> job: it's reading from Kafka and writing to Kafka and there is not
> a
> > > >>> complex graph of different operations with network shuffles in
> > between,
> > > >> as
> > > >>> you have with Flink.
> > > >>>
> > > >>> This different nature of Flink is also why I think that readers
> need
> > > >>> awareness of other readers to do the event-time alignment, and this
> > is
> > > >>> where shared state comes in.
> > > >>>
> > > >>>> On 10. Oct 2018, at 20:47, Elias Levy <
> [hidden email]>
> > > >>> wrote:
> > > >>>>
> > > >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <[hidden email]>
> > > >> wrote:
> > > >>>>
> > > >>>>> I think the new source interface would be designed to be able to
> > > >>> leverage
> > > >>>>> shared state to achieve time alignment.
> > > >>>>> I don't think this would be possible without some kind of shared
> > > >> state.
> > > >>>>>
> > > >>>>> The problem of tasks that are far ahead in time cannot be solved
> > with
> > > >>>>> back-pressure.
> > > >>>>> That's because a task cannot choose from which source task it
> > accepts
> > > >>>>> events and from which doesn't.
> > > >>>>> If it blocks an input, all downstream tasks that are connected to
> > the
> > > >>>>> operator are affected. This can easily lead to deadlocks.
> > > >>>>> Therefore, all operators need to be able to handle events when
> they
> > > >>> arrive.
> > > >>>>> If they cannot process them yet because they are too far ahead in
> > > >> time,
> > > >>>>> they are put in state.
> > > >>>>>
> > > >>>>
> > > >>>> The idea I was suggesting is not for operators to block an input.
> > > >>> Rather,
> > > >>>> it is that they selectively choose from which input to process the
> > > next
> > > >>>> message from based on their timestamp, so long as there are
> buffered
> > > >>>> messages waiting to be processed.  That is a best-effort alignment
> > > >>>> strategy.  Seems to work relatively well in practice, at least
> > within
> > > >>> Kafka
> > > >>>> Streams.
> > > >>>>
> > > >>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate
> for
> > > >> both
> > > >>>> its inputs.  Instead, it could keep them separate and selectively
> > > >> consume
> > > >>>> from the one that had a buffer available, and if both have buffers
> > > >>>> available, from the buffer with the messages with a lower
> timestamp.
> > > >>>
> > > >>>
> > > >>
> > > >
> > >
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

gerardg
Any advance related to synchronizing ingestion by event/ingestion-time
between kafka partitions?

On Thu, Nov 15, 2018 at 1:27 AM Jamie Grier <[hidden email]> wrote:

> Hey all,
>
> I think all we need for this on the state sharing side is pretty simple.  I
> opened a JIRA to track this work and submitted a PR for the state sharing
> bit.
>
> https://issues.apache.org/jira/browse/FLINK-10886
> https://github.com/apache/flink/pull/7099
>
> Please provide feedback :)
>
> -Jamie
>
>
> On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann <[hidden email]> wrote:
>
> > Hi Thomas,
> >
> > using Akka directly would further manifest our dependency on Scala in
> > flink-runtime. This is something we are currently trying to get rid of.
> For
> > that purpose we have added the RpcService abstraction which encapsulates
> > all Akka specific logic. We hope that we can soon get rid of the Scala
> > dependency in flink-runtime by using a special class loader only for
> > loading the AkkaRpcService implementation.
> >
> > I think the easiest way to sync the task information is actually going
> > through the JobMaster because the subtasks don't know on which other TMs
> > the other subtasks run. Otherwise, we would need to have some TM
> detection
> > mechanism between TMs. If you choose this way, then you should be able to
> > use the RpcService by extending the JobMasterGateway by additional RPCs.
> >
> > Cheers,
> > Till
> >
> > On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise <[hidden email]> wrote:
> >
> > > Hi,
> > >
> > > We are planning to work on the Kinesis consumer in the following order:
> > >
> > > 1. Add per shard watermarking:
> > > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code
> we
> > > already use internally; I will open a PR to add it to the Flink Kinesis
> > > consumer
> > > 2. Exchange of per subtask watermarks between all subtasks of one or
> > > multiple sources
> > > 3. Implement queue approach described in Jamie's document in to utilize
> > 1.)
> > > and 2.) to align the shard consumers WRT event time
> > >
> > > There was some discussion regarding the mechanism to share the
> watermarks
> > > between subtasks. If there is something that can be re-used it would be
> > > great. Otherwise I'm going to further investigate the Akka or JGroups
> > > route. Regarding Akka, since it is used within Flink already, is there
> an
> > > abstraction that you would recommend to consider to avoid direct
> > > dependency?
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > >
> > > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> > > <[hidden email]> wrote:
> > >
> > > > Not yet. We only have some initial thoughts and have not worked on it
> > > yet.
> > > > We will update the progress in this discussion if have.
> > > >
> > > > Best,
> > > > Zhijiang
> > > > ------------------------------------------------------------------
> > > > 发件人:Aljoscha Krettek <[hidden email]>
> > > > 发送时间:2018年10月18日(星期四) 17:53
> > > > 收件人:dev <[hidden email]>; Zhijiang(wangzhijiang999) <
> > > > [hidden email]>
> > > > 抄 送:Till Rohrmann <[hidden email]>
> > > > 主 题:Re: Sharing state between subtasks
> > > >
> > > > Hi Zhijiang,
> > > >
> > > > do you already have working code or a design doc for the second
> > approach?
> > > >
> > > > Best,
> > > > Aljoscha
> > > >
> > > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > > > [hidden email]> wrote:
> > > > >
> > > > > Just noticed this discussion from @Till Rohrmann's weekly community
> > > > update and I want to share some thoughts from our experiences.
> > > > >
> > > > > We also encountered the source consuption skew issue before, and we
> > are
> > > > focused on improving this by two possible ways.
> > > > >
> > > > > 1. Control the read strategy by the downstream side. In detail,
> every
> > > > input channel in downstream task corresponds to the consumption of
> one
> > > > upstream source task, and we will tag each input channel with
> watermark
> > > to
> > > > find the lowest channel to read in high priority. In essence, we
> > actually
> > > > rely on the mechanism of backpressure. If the channel with highest
> > > > timestamp is not read by downstream task for a while, it will block
> the
> > > > corresponding source task to read when the buffers are exhausted. It
> is
> > > no
> > > > need to change the source interface in this way, but there are two
> > major
> > > > concerns: first it will affect the barier alignment resulting in
> > > checkpoint
> > > > delayed or expired. Second it can not confirm source consumption
> > > alignment
> > > > very precisely, and it is just a best effort way. So we gave up this
> > way
> > > > finally.
> > > > >
> > > > > 2. Add the new component of SourceCoordinator to coordinate the
> > source
> > > > consumption distributedly. For example we can start this componnet in
> > the
> > > > JobManager like the current role of CheckpointCoordinator. Then every
> > > > source task would commnicate with JobManager via current RPC
> mechanism,
> > > > maybe we can rely on the heartbeat message to attach the consumption
> > > > progress as the payloads. The JobManagerwill accumulator or state all
> > the
> > > > reported progress and then give responses for different source tasks.
> > We
> > > > can define a protocol for indicating the fast soruce task to sleep
> for
> > > > specific time for example. To do so, the coordinator has the global
> > > > informations to give the proper decision for individuals, so it seems
> > > more
> > > > precise. And it will not affect the barrier alignment, because the
> > > sleeping
> > > > fast source can release the lock to emit barrier as normal. The only
> > > > concern is the changes for source interface and may affect all
> related
> > > > source implementations.
> > > > >
> > > > > Currently we prefer to the second way to implement and will refer
> to
> > > > other good points above. :)
> > > > >
> > > > > Best,
> > > > > Zhijiang
> > > > > ------------------------------------------------------------------
> > > > > 发件人:Jamie Grier <[hidden email]>
> > > > > 发送时间:2018年10月17日(星期三) 03:28
> > > > > 收件人:dev <[hidden email]>
> > > > > 主 题:Re: Sharing state between subtasks
> > > > >
> > > > > Here's a doc I started describing some changes we would like to
> make
> > > > > starting with the Kinesis Source.. It describes a refactoring of
> that
> > > > code
> > > > > specifically and also hopefully a pattern and some reusable code we
> > can
> > > > use
> > > > > in the other sources as well.  The end goal would be best-effort
> > > > event-time
> > > > > synchronization across all Flink sources but we are going to start
> > with
> > > > the
> > > > > Kinesis Source first.
> > > > >
> > > > > Please take a look and please provide thoughts and opinions about
> the
> > > > best
> > > > > state sharing mechanism to use -- that section is left blank and
> > we're
> > > > > especially looking for input there.
> > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> > > > >
> > > > > -Jamie
> > > > >
> > > > >
> > > > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <
> [hidden email]
> > >
> > > > wrote:
> > > > >
> > > > >> But on the Kafka source level it should be perfectly fine to do
> what
> > > > Elias
> > > > >> proposed. This is of course is not the perfect solution but could
> > > bring
> > > > us
> > > > >> forward quite a bit. The changes required for this should also be
> > > > minimal.
> > > > >> This would become obsolete once we have something like shared
> state.
> > > But
> > > > >> until then, I think it would worth a try.
> > > > >>
> > > > >> Cheers,
> > > > >> Till
> > > > >>
> > > > >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <
> > [hidden email]
> > > >
> > > > >> wrote:
> > > > >>
> > > > >>> The reason this selective reading doesn't work well in Flink in
> the
> > > > >> moment
> > > > >>> is because of checkpointing. For checkpointing, checkpoint
> barriers
> > > > >> travel
> > > > >>> within the streams. If we selectively read from inputs based on
> > > > >> timestamps
> > > > >>> this is akin to blocking an input if that input is very far ahead
> > in
> > > > >> event
> > > > >>> time, which can happen when you have a very fast source and a
> slow
> > > > source
> > > > >>> (in event time), maybe because you're in a catchup phase. In
> those
> > > > cases
> > > > >>> it's better to simply not read the data at the sources, as Thomas
> > > said.
> > > > >>> This is also because with Kafka Streams, each operator is
> basically
> > > its
> > > > >> own
> > > > >>> job: it's reading from Kafka and writing to Kafka and there is
> not
> > a
> > > > >>> complex graph of different operations with network shuffles in
> > > between,
> > > > >> as
> > > > >>> you have with Flink.
> > > > >>>
> > > > >>> This different nature of Flink is also why I think that readers
> > need
> > > > >>> awareness of other readers to do the event-time alignment, and
> this
> > > is
> > > > >>> where shared state comes in.
> > > > >>>
> > > > >>>> On 10. Oct 2018, at 20:47, Elias Levy <
> > [hidden email]>
> > > > >>> wrote:
> > > > >>>>
> > > > >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <
> [hidden email]>
> > > > >> wrote:
> > > > >>>>
> > > > >>>>> I think the new source interface would be designed to be able
> to
> > > > >>> leverage
> > > > >>>>> shared state to achieve time alignment.
> > > > >>>>> I don't think this would be possible without some kind of
> shared
> > > > >> state.
> > > > >>>>>
> > > > >>>>> The problem of tasks that are far ahead in time cannot be
> solved
> > > with
> > > > >>>>> back-pressure.
> > > > >>>>> That's because a task cannot choose from which source task it
> > > accepts
> > > > >>>>> events and from which doesn't.
> > > > >>>>> If it blocks an input, all downstream tasks that are connected
> to
> > > the
> > > > >>>>> operator are affected. This can easily lead to deadlocks.
> > > > >>>>> Therefore, all operators need to be able to handle events when
> > they
> > > > >>> arrive.
> > > > >>>>> If they cannot process them yet because they are too far ahead
> in
> > > > >> time,
> > > > >>>>> they are put in state.
> > > > >>>>>
> > > > >>>>
> > > > >>>> The idea I was suggesting is not for operators to block an
> input.
> > > > >>> Rather,
> > > > >>>> it is that they selectively choose from which input to process
> the
> > > > next
> > > > >>>> message from based on their timestamp, so long as there are
> > buffered
> > > > >>>> messages waiting to be processed.  That is a best-effort
> alignment
> > > > >>>> strategy.  Seems to work relatively well in practice, at least
> > > within
> > > > >>> Kafka
> > > > >>>> Streams.
> > > > >>>>
> > > > >>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate
> > for
> > > > >> both
> > > > >>>> its inputs.  Instead, it could keep them separate and
> selectively
> > > > >> consume
> > > > >>>> from the one that had a buffer available, and if both have
> buffers
> > > > >>>> available, from the buffer with the messages with a lower
> > timestamp.
> > > > >>>
> > > > >>>
> > > > >>
> > > > >
> > > >
> > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Thomas Weise
The state sharing support will be part of the upcoming 1.8 release.

We have also completed most of the synchronization work for the Kinesis
consumer and will contribute those changes to Flink soon.

Most of the code will be reusable for Kafka consumer.

We will need the same support in the Kafka consumer but have not started
work to integrate that yet.

Thomas

On Thu, Mar 7, 2019 at 6:53 AM Gerard Garcia <[hidden email]> wrote:

> Any advance related to synchronizing ingestion by event/ingestion-time
> between kafka partitions?
>
> On Thu, Nov 15, 2018 at 1:27 AM Jamie Grier <[hidden email]>
> wrote:
>
> > Hey all,
> >
> > I think all we need for this on the state sharing side is pretty
> simple.  I
> > opened a JIRA to track this work and submitted a PR for the state sharing
> > bit.
> >
> > https://issues.apache.org/jira/browse/FLINK-10886
> > https://github.com/apache/flink/pull/7099
> >
> > Please provide feedback :)
> >
> > -Jamie
> >
> >
> > On Thu, Nov 1, 2018 at 3:33 AM Till Rohrmann <[hidden email]>
> wrote:
> >
> > > Hi Thomas,
> > >
> > > using Akka directly would further manifest our dependency on Scala in
> > > flink-runtime. This is something we are currently trying to get rid of.
> > For
> > > that purpose we have added the RpcService abstraction which
> encapsulates
> > > all Akka specific logic. We hope that we can soon get rid of the Scala
> > > dependency in flink-runtime by using a special class loader only for
> > > loading the AkkaRpcService implementation.
> > >
> > > I think the easiest way to sync the task information is actually going
> > > through the JobMaster because the subtasks don't know on which other
> TMs
> > > the other subtasks run. Otherwise, we would need to have some TM
> > detection
> > > mechanism between TMs. If you choose this way, then you should be able
> to
> > > use the RpcService by extending the JobMasterGateway by additional
> RPCs.
> > >
> > > Cheers,
> > > Till
> > >
> > > On Wed, Oct 31, 2018 at 6:52 PM Thomas Weise <[hidden email]> wrote:
> > >
> > > > Hi,
> > > >
> > > > We are planning to work on the Kinesis consumer in the following
> order:
> > > >
> > > > 1. Add per shard watermarking:
> > > > https://issues.apache.org/jira/browse/FLINK-5697 - this will be code
> > we
> > > > already use internally; I will open a PR to add it to the Flink
> Kinesis
> > > > consumer
> > > > 2. Exchange of per subtask watermarks between all subtasks of one or
> > > > multiple sources
> > > > 3. Implement queue approach described in Jamie's document in to
> utilize
> > > 1.)
> > > > and 2.) to align the shard consumers WRT event time
> > > >
> > > > There was some discussion regarding the mechanism to share the
> > watermarks
> > > > between subtasks. If there is something that can be re-used it would
> be
> > > > great. Otherwise I'm going to further investigate the Akka or JGroups
> > > > route. Regarding Akka, since it is used within Flink already, is
> there
> > an
> > > > abstraction that you would recommend to consider to avoid direct
> > > > dependency?
> > > >
> > > > Thanks,
> > > > Thomas
> > > >
> > > >
> > > >
> > > > On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
> > > > <[hidden email]> wrote:
> > > >
> > > > > Not yet. We only have some initial thoughts and have not worked on
> it
> > > > yet.
> > > > > We will update the progress in this discussion if have.
> > > > >
> > > > > Best,
> > > > > Zhijiang
> > > > > ------------------------------------------------------------------
> > > > > 发件人:Aljoscha Krettek <[hidden email]>
> > > > > 发送时间:2018年10月18日(星期四) 17:53
> > > > > 收件人:dev <[hidden email]>; Zhijiang(wangzhijiang999) <
> > > > > [hidden email]>
> > > > > 抄 送:Till Rohrmann <[hidden email]>
> > > > > 主 题:Re: Sharing state between subtasks
> > > > >
> > > > > Hi Zhijiang,
> > > > >
> > > > > do you already have working code or a design doc for the second
> > > approach?
> > > > >
> > > > > Best,
> > > > > Aljoscha
> > > > >
> > > > > > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> > > > > [hidden email]> wrote:
> > > > > >
> > > > > > Just noticed this discussion from @Till Rohrmann's weekly
> community
> > > > > update and I want to share some thoughts from our experiences.
> > > > > >
> > > > > > We also encountered the source consuption skew issue before, and
> we
> > > are
> > > > > focused on improving this by two possible ways.
> > > > > >
> > > > > > 1. Control the read strategy by the downstream side. In detail,
> > every
> > > > > input channel in downstream task corresponds to the consumption of
> > one
> > > > > upstream source task, and we will tag each input channel with
> > watermark
> > > > to
> > > > > find the lowest channel to read in high priority. In essence, we
> > > actually
> > > > > rely on the mechanism of backpressure. If the channel with highest
> > > > > timestamp is not read by downstream task for a while, it will block
> > the
> > > > > corresponding source task to read when the buffers are exhausted.
> It
> > is
> > > > no
> > > > > need to change the source interface in this way, but there are two
> > > major
> > > > > concerns: first it will affect the barier alignment resulting in
> > > > checkpoint
> > > > > delayed or expired. Second it can not confirm source consumption
> > > > alignment
> > > > > very precisely, and it is just a best effort way. So we gave up
> this
> > > way
> > > > > finally.
> > > > > >
> > > > > > 2. Add the new component of SourceCoordinator to coordinate the
> > > source
> > > > > consumption distributedly. For example we can start this componnet
> in
> > > the
> > > > > JobManager like the current role of CheckpointCoordinator. Then
> every
> > > > > source task would commnicate with JobManager via current RPC
> > mechanism,
> > > > > maybe we can rely on the heartbeat message to attach the
> consumption
> > > > > progress as the payloads. The JobManagerwill accumulator or state
> all
> > > the
> > > > > reported progress and then give responses for different source
> tasks.
> > > We
> > > > > can define a protocol for indicating the fast soruce task to sleep
> > for
> > > > > specific time for example. To do so, the coordinator has the global
> > > > > informations to give the proper decision for individuals, so it
> seems
> > > > more
> > > > > precise. And it will not affect the barrier alignment, because the
> > > > sleeping
> > > > > fast source can release the lock to emit barrier as normal. The
> only
> > > > > concern is the changes for source interface and may affect all
> > related
> > > > > source implementations.
> > > > > >
> > > > > > Currently we prefer to the second way to implement and will refer
> > to
> > > > > other good points above. :)
> > > > > >
> > > > > > Best,
> > > > > > Zhijiang
> > > > > >
> ------------------------------------------------------------------
> > > > > > 发件人:Jamie Grier <[hidden email]>
> > > > > > 发送时间:2018年10月17日(星期三) 03:28
> > > > > > 收件人:dev <[hidden email]>
> > > > > > 主 题:Re: Sharing state between subtasks
> > > > > >
> > > > > > Here's a doc I started describing some changes we would like to
> > make
> > > > > > starting with the Kinesis Source.. It describes a refactoring of
> > that
> > > > > code
> > > > > > specifically and also hopefully a pattern and some reusable code
> we
> > > can
> > > > > use
> > > > > > in the other sources as well.  The end goal would be best-effort
> > > > > event-time
> > > > > > synchronization across all Flink sources but we are going to
> start
> > > with
> > > > > the
> > > > > > Kinesis Source first.
> > > > > >
> > > > > > Please take a look and please provide thoughts and opinions about
> > the
> > > > > best
> > > > > > state sharing mechanism to use -- that section is left blank and
> > > we're
> > > > > > especially looking for input there.
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> > > > > >
> > > > > > -Jamie
> > > > > >
> > > > > >
> > > > > > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <
> > [hidden email]
> > > >
> > > > > wrote:
> > > > > >
> > > > > >> But on the Kafka source level it should be perfectly fine to do
> > what
> > > > > Elias
> > > > > >> proposed. This is of course is not the perfect solution but
> could
> > > > bring
> > > > > us
> > > > > >> forward quite a bit. The changes required for this should also
> be
> > > > > minimal.
> > > > > >> This would become obsolete once we have something like shared
> > state.
> > > > But
> > > > > >> until then, I think it would worth a try.
> > > > > >>
> > > > > >> Cheers,
> > > > > >> Till
> > > > > >>
> > > > > >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <
> > > [hidden email]
> > > > >
> > > > > >> wrote:
> > > > > >>
> > > > > >>> The reason this selective reading doesn't work well in Flink in
> > the
> > > > > >> moment
> > > > > >>> is because of checkpointing. For checkpointing, checkpoint
> > barriers
> > > > > >> travel
> > > > > >>> within the streams. If we selectively read from inputs based on
> > > > > >> timestamps
> > > > > >>> this is akin to blocking an input if that input is very far
> ahead
> > > in
> > > > > >> event
> > > > > >>> time, which can happen when you have a very fast source and a
> > slow
> > > > > source
> > > > > >>> (in event time), maybe because you're in a catchup phase. In
> > those
> > > > > cases
> > > > > >>> it's better to simply not read the data at the sources, as
> Thomas
> > > > said.
> > > > > >>> This is also because with Kafka Streams, each operator is
> > basically
> > > > its
> > > > > >> own
> > > > > >>> job: it's reading from Kafka and writing to Kafka and there is
> > not
> > > a
> > > > > >>> complex graph of different operations with network shuffles in
> > > > between,
> > > > > >> as
> > > > > >>> you have with Flink.
> > > > > >>>
> > > > > >>> This different nature of Flink is also why I think that readers
> > > need
> > > > > >>> awareness of other readers to do the event-time alignment, and
> > this
> > > > is
> > > > > >>> where shared state comes in.
> > > > > >>>
> > > > > >>>> On 10. Oct 2018, at 20:47, Elias Levy <
> > > [hidden email]>
> > > > > >>> wrote:
> > > > > >>>>
> > > > > >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <
> > [hidden email]>
> > > > > >> wrote:
> > > > > >>>>
> > > > > >>>>> I think the new source interface would be designed to be able
> > to
> > > > > >>> leverage
> > > > > >>>>> shared state to achieve time alignment.
> > > > > >>>>> I don't think this would be possible without some kind of
> > shared
> > > > > >> state.
> > > > > >>>>>
> > > > > >>>>> The problem of tasks that are far ahead in time cannot be
> > solved
> > > > with
> > > > > >>>>> back-pressure.
> > > > > >>>>> That's because a task cannot choose from which source task it
> > > > accepts
> > > > > >>>>> events and from which doesn't.
> > > > > >>>>> If it blocks an input, all downstream tasks that are
> connected
> > to
> > > > the
> > > > > >>>>> operator are affected. This can easily lead to deadlocks.
> > > > > >>>>> Therefore, all operators need to be able to handle events
> when
> > > they
> > > > > >>> arrive.
> > > > > >>>>> If they cannot process them yet because they are too far
> ahead
> > in
> > > > > >> time,
> > > > > >>>>> they are put in state.
> > > > > >>>>>
> > > > > >>>>
> > > > > >>>> The idea I was suggesting is not for operators to block an
> > input.
> > > > > >>> Rather,
> > > > > >>>> it is that they selectively choose from which input to process
> > the
> > > > > next
> > > > > >>>> message from based on their timestamp, so long as there are
> > > buffered
> > > > > >>>> messages waiting to be processed.  That is a best-effort
> > alignment
> > > > > >>>> strategy.  Seems to work relatively well in practice, at least
> > > > within
> > > > > >>> Kafka
> > > > > >>>> Streams.
> > > > > >>>>
> > > > > >>>> E.g. at the moment StreamTwoInputProcessor uses a
> UnionInputGate
> > > for
> > > > > >> both
> > > > > >>>> its inputs.  Instead, it could keep them separate and
> > selectively
> > > > > >> consume
> > > > > >>>> from the one that had a buffer available, and if both have
> > buffers
> > > > > >>>> available, from the buffer with the messages with a lower
> > > timestamp.
> > > > > >>>
> > > > > >>>
> > > > > >>
> > > > > >
> > > > >
> > > > >
> > > > >
> > > >
> > >
> >
>
12