Sharing state between subtasks

classic Classic list List threaded Threaded
24 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Sharing state between subtasks

Thomas Weise
I'm looking to implement a state sharing mechanism between subtasks (of one
or multiple tasks). Our use case is to align watermarks between subtasks of
one or multiple sources to prevent some data fetchers to race ahead of
others and cause massive state buffering in Flink.

Each subtask would share a small state (probably just a key and couple
longs). The state would be updated periodically (perhaps every 30s). Other
subtasks should see these changes with similar latency. It is essentially a
hash table to which every node contributes a distinct key.

An initial idea was to implement this using ZooKeeper ephemeral nodes. But
since there is no way to read all child nodes in one sweep, state access
becomes very chatty. With lets's say 512 subtasks we would end up with 512
* 512 reads per interval (1 to list children, N-1 to fetch data, per
subtask).

My next stop will be a group communication mechanism like JGroups or Akka
(following looks like a potential good fit:
https://doc.akka.io/docs/akka/2.5/distributed-data.html?language=java). But
before that I wanted to check if others already had a similar need and
possibly experience/implementation to share?

There are probably more use cases related to discovery etc. Perhaps Flink
could provide a state primitive, if there is broader interest in the
community?

Thanks,
Thomas
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Jamie Grier-3
I'll add to what Thomas already said..  The larger issue driving this is
that when reading from a source with many parallel partitions, especially
when reading lots of historical data (or recovering from downtime and there
is a backlog to read), it's quite common for there to develop an event-time
skew across those partitions.

When doing event-time windowing -- or in fact any event-time driven
processing -- the event time skew across partitions results directly in
increased buffering in Flink and of course the corresponding
state/checkpoint size growth.

As the event-time skew and state size grows larger this can have a major
effect on application performance and in some cases result in a "death
spiral" where the application performance get's worse and worse as the
state size grows and grows.

So, one solution to this problem, outside of core changes in Flink itself,
seems to be to try to coordinate sources across partitions so that they
make progress through event time at roughly the same rate.  In fact if
there is large skew the idea would be to slow or even stop reading from
some partitions with newer data while first reading the partitions with
older data.  Anyway, to do this we need to share state somehow amongst
sub-tasks.

The common sense view of this is the following:  Why would we want to pull
data from a perfectly good buffer (like a filesystem, Kinesis, or Kafka)
into Flink state just to manage and checkpoint it while waiting to be able
to complete event time computations.  The completion of computations is
held up by the partitions with the oldest data so it's of no value to read
the newer data until you've read the old.  It seems much better to leave
the newer data buffered upstream.

I'd be very curious to hear others' thoughts on this..  I would expect many
people to have run into similar issues.  I also wonder if anybody has
already been working on similar issues.  It seems there is room for some
core Flink changes to address this as well and I'm guessing people have
already thought about it.

-Jamie



On Sun, Oct 7, 2018 at 12:58 PM Thomas Weise <[hidden email]> wrote:

> I'm looking to implement a state sharing mechanism between subtasks (of one
> or multiple tasks). Our use case is to align watermarks between subtasks of
> one or multiple sources to prevent some data fetchers to race ahead of
> others and cause massive state buffering in Flink.
>
> Each subtask would share a small state (probably just a key and couple
> longs). The state would be updated periodically (perhaps every 30s). Other
> subtasks should see these changes with similar latency. It is essentially a
> hash table to which every node contributes a distinct key.
>
> An initial idea was to implement this using ZooKeeper ephemeral nodes. But
> since there is no way to read all child nodes in one sweep, state access
> becomes very chatty. With lets's say 512 subtasks we would end up with 512
> * 512 reads per interval (1 to list children, N-1 to fetch data, per
> subtask).
>
> My next stop will be a group communication mechanism like JGroups or Akka
> (following looks like a potential good fit:
> https://doc.akka.io/docs/akka/2.5/distributed-data.html?language=java).
> But
> before that I wanted to check if others already had a similar need and
> possibly experience/implementation to share?
>
> There are probably more use cases related to discovery etc. Perhaps Flink
> could provide a state primitive, if there is broader interest in the
> community?
>
> Thanks,
> Thomas
>
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Elias Levy
Kafka Streams handles this problem, time alignment, by processing records
from the partitions with the lowest timestamp in a best effort basis.
See KIP-353 for the details.  The same could be done within the Kafka
source and multiple input stream operators.  I opened FLINK-4558
<https://issues.apache.org/jira/browse/FLINK-4558> a while ago regarding
this topic.

On Mon, Oct 8, 2018 at 3:41 PM Jamie Grier <[hidden email]> wrote:

> I'd be very curious to hear others' thoughts on this..  I would expect many
> people to have run into similar issues.  I also wonder if anybody has
> already been working on similar issues.  It seems there is room for some
> core Flink changes to address this as well and I'm guessing people have
> already thought about it.
>
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Aljoscha Krettek-2
Yes, I think this is the way to go.

This would also go well with a redesign of the source interface that has been floated for a while now. I also created a prototype a while back: https://github.com/aljoscha/flink/tree/refactor-source-interface <https://github.com/aljoscha/flink/tree/refactor-source-interface>. Just as a refresher, the redesign aims at several things:

1. Make partitions/splits explicit in the interface. Currently, the fact that there are file splits or Kafka partitions or Kinesis shards is hidden in the source implementation while it would be beneficial for the system to know of these and to be able to track watermarks for them. Currently, there is a custom implementation for per-partition watermark tracking in the Kafka Consumer that this redesign would obviate.

2. Split split/partition/shard discovery from the reading part. This would allow rebalancing work and again makes the nature of sources more explicit in the interfaces.

3. Go away from the push model to a pull model. The problem with the current source interface is that the source controls the read-loop and has to get the checkpoint lock for emitting elements/updating state. If we get the loop out of the source this leaves more potential for Flink to be clever about reading from sources.

The prototype posted above defines three new interfaces: Source, SplitEnumerator, and SplitReader, along with a naive example and a working Kafka Consumer (with checkpointing, actually).

If we had this source interface, along with a service for propagating watermark information the code that reads form the splits could de-prioritise certain splits and we would get the event-time alignment behaviour for all sources that are implemented using the new interface without requiring special code in each source implementation.

@Elias Do you know if Kafka Consumers do this alignment across multiple consumers or only within one Consumer across the partitions that it reads from.

> On 9. Oct 2018, at 00:55, Elias Levy <[hidden email]> wrote:
>
> Kafka Streams handles this problem, time alignment, by processing records
> from the partitions with the lowest timestamp in a best effort basis.
> See KIP-353 for the details.  The same could be done within the Kafka
> source and multiple input stream operators.  I opened FLINK-4558
> <https://issues.apache.org/jira/browse/FLINK-4558> a while ago regarding
> this topic.
>
> On Mon, Oct 8, 2018 at 3:41 PM Jamie Grier <[hidden email]> wrote:
>
>> I'd be very curious to hear others' thoughts on this..  I would expect many
>> people to have run into similar issues.  I also wonder if anybody has
>> already been working on similar issues.  It seems there is room for some
>> core Flink changes to address this as well and I'm guessing people have
>> already thought about it.
>>

Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Fabian Hueske-2
Hi,

I think watermark / event-time skew is a problem that many users are
struggling with.
A built-in primitive to align event-time would be a great feature!

However, there are also some cases when it would be useful for different
streams to have diverging event-time, such as an interval join [1]
(DataStream API) or time-windowed join (SQL) that joins one stream will
events from another stream that happened 2 to 1 hour ago.
Granted, this is a very specific case and not the norm, but it might make
sense to have it in the back of our heads when designing this feature.

Best, Fabian

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/joining.html#interval-join

Am Di., 9. Okt. 2018 um 10:25 Uhr schrieb Aljoscha Krettek <
[hidden email]>:

> Yes, I think this is the way to go.
>
> This would also go well with a redesign of the source interface that has
> been floated for a while now. I also created a prototype a while back:
> https://github.com/aljoscha/flink/tree/refactor-source-interface <
> https://github.com/aljoscha/flink/tree/refactor-source-interface>. Just
> as a refresher, the redesign aims at several things:
>
> 1. Make partitions/splits explicit in the interface. Currently, the fact
> that there are file splits or Kafka partitions or Kinesis shards is hidden
> in the source implementation while it would be beneficial for the system to
> know of these and to be able to track watermarks for them. Currently, there
> is a custom implementation for per-partition watermark tracking in the
> Kafka Consumer that this redesign would obviate.
>
> 2. Split split/partition/shard discovery from the reading part. This would
> allow rebalancing work and again makes the nature of sources more explicit
> in the interfaces.
>
> 3. Go away from the push model to a pull model. The problem with the
> current source interface is that the source controls the read-loop and has
> to get the checkpoint lock for emitting elements/updating state. If we get
> the loop out of the source this leaves more potential for Flink to be
> clever about reading from sources.
>
> The prototype posted above defines three new interfaces: Source,
> SplitEnumerator, and SplitReader, along with a naive example and a working
> Kafka Consumer (with checkpointing, actually).
>
> If we had this source interface, along with a service for propagating
> watermark information the code that reads form the splits could
> de-prioritise certain splits and we would get the event-time alignment
> behaviour for all sources that are implemented using the new interface
> without requiring special code in each source implementation.
>
> @Elias Do you know if Kafka Consumers do this alignment across multiple
> consumers or only within one Consumer across the partitions that it reads
> from.
>
> > On 9. Oct 2018, at 00:55, Elias Levy <[hidden email]>
> wrote:
> >
> > Kafka Streams handles this problem, time alignment, by processing records
> > from the partitions with the lowest timestamp in a best effort basis.
> > See KIP-353 for the details.  The same could be done within the Kafka
> > source and multiple input stream operators.  I opened FLINK-4558
> > <https://issues.apache.org/jira/browse/FLINK-4558> a while ago regarding
> > this topic.
> >
> > On Mon, Oct 8, 2018 at 3:41 PM Jamie Grier <[hidden email]>
> wrote:
> >
> >> I'd be very curious to hear others' thoughts on this..  I would expect
> many
> >> people to have run into similar issues.  I also wonder if anybody has
> >> already been working on similar issues.  It seems there is room for some
> >> core Flink changes to address this as well and I'm guessing people have
> >> already thought about it.
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Elias Levy
In reply to this post by Aljoscha Krettek-2
On Tue, Oct 9, 2018 at 1:25 AM Aljoscha Krettek <[hidden email]> wrote:

> @Elias Do you know if Kafka Consumers do this alignment across multiple
> consumers or only within one Consumer across the partitions that it reads
> from.
>

The behavior is part of Kafka Streams
<https://github.com/apache/kafka/blob/96132e2dbb69a0c6c11cb183bb05cefef4e30557/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L65>,
not the Kafka consumer.  The alignment does not occur across Kafka
consumers, but that is because Kafka Streams, unlikely Flink, uses a single
consumer to fetch records from multiple sources / topics.  The alignment
occurs with the stream task.  Stream tasks keep queues per topic-partition
(which may be from different topics), and select the next record to
processed by selecting the queue with the lowest timestamp.

The equivalent in Flink would be for the Kafka connector source to select
the message among partitions with the lowest timestamp to emit next, and
for multiple input stream operators to select the message among inputs with
the lowest timestamp to process.
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Jamie Grier-3
Okay, so I think there is a lot of agreement here about (a) This is a real
issue for people, and (b) an ideal long-term approach to solving it.

As Aljoscha and Elias said a full solution to this would be to also
redesign the source interface such that individual partitions are exposed
in the API and not hidden inside sources like now -- then we could be much
smarter about the way we read from the individual partitions.  We would
also have to modify the stream task code such that it also reads in a
time-aligned way throughout the data flow to solve the full problem --
either that or use some shared state between sources to keep them
time-aligned across sub-tasks just at the source.

With regard to this question of state sharing between source sub-tasks
versus modifying Flink to do time-aligned reads throughout the system --
does anybody have a strong opinion on this?

We're basically looking for a way forward and our initial approach, though
ugly because it requires modification to all of the sources we use, was
going to be to share state between source sub-tasks in order to keep them
time-aligned with no further modifications required to Flink's core.

However, if it seems reasonable to do and there is consensus on the best
way forward maybe we should be looking at introducing the time-alignment
properly instead of hacking the sources.




On Tue, Oct 9, 2018 at 12:01 PM Elias Levy <[hidden email]>
wrote:

> On Tue, Oct 9, 2018 at 1:25 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > @Elias Do you know if Kafka Consumers do this alignment across multiple
> > consumers or only within one Consumer across the partitions that it reads
> > from.
> >
>
> The behavior is part of Kafka Streams
> <
> https://github.com/apache/kafka/blob/96132e2dbb69a0c6c11cb183bb05cefef4e30557/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L65
> >,
> not the Kafka consumer.  The alignment does not occur across Kafka
> consumers, but that is because Kafka Streams, unlikely Flink, uses a single
> consumer to fetch records from multiple sources / topics.  The alignment
> occurs with the stream task.  Stream tasks keep queues per topic-partition
> (which may be from different topics), and select the next record to
> processed by selecting the queue with the lowest timestamp.
>
> The equivalent in Flink would be for the Kafka connector source to select
> the message among partitions with the lowest timestamp to emit next, and
> for multiple input stream operators to select the message among inputs with
> the lowest timestamp to process.
>
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Jamie Grier-3
Also, I'm afraid I derailed this thread just a bit..  So also back to
Thomas's original question..

If we decide state-sharing across source subtasks is the way forward for
now -- does anybody have thoughts to share on what form this should take?

Thomas mentioned Akka or JGroups.  Other thoughts?


On Wed, Oct 10, 2018 at 6:58 AM Jamie Grier <[hidden email]> wrote:

> Okay, so I think there is a lot of agreement here about (a) This is a real
> issue for people, and (b) an ideal long-term approach to solving it.
>
> As Aljoscha and Elias said a full solution to this would be to also
> redesign the source interface such that individual partitions are exposed
> in the API and not hidden inside sources like now -- then we could be much
> smarter about the way we read from the individual partitions.  We would
> also have to modify the stream task code such that it also reads in a
> time-aligned way throughout the data flow to solve the full problem --
> either that or use some shared state between sources to keep them
> time-aligned across sub-tasks just at the source.
>
> With regard to this question of state sharing between source sub-tasks
> versus modifying Flink to do time-aligned reads throughout the system --
> does anybody have a strong opinion on this?
>
> We're basically looking for a way forward and our initial approach, though
> ugly because it requires modification to all of the sources we use, was
> going to be to share state between source sub-tasks in order to keep them
> time-aligned with no further modifications required to Flink's core.
>
> However, if it seems reasonable to do and there is consensus on the best
> way forward maybe we should be looking at introducing the time-alignment
> properly instead of hacking the sources.
>
>
>
>
> On Tue, Oct 9, 2018 at 12:01 PM Elias Levy <[hidden email]>
> wrote:
>
>> On Tue, Oct 9, 2018 at 1:25 AM Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>> > @Elias Do you know if Kafka Consumers do this alignment across multiple
>> > consumers or only within one Consumer across the partitions that it
>> reads
>> > from.
>> >
>>
>> The behavior is part of Kafka Streams
>> <
>> https://github.com/apache/kafka/blob/96132e2dbb69a0c6c11cb183bb05cefef4e30557/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L65
>> >,
>> not the Kafka consumer.  The alignment does not occur across Kafka
>> consumers, but that is because Kafka Streams, unlikely Flink, uses a
>> single
>> consumer to fetch records from multiple sources / topics.  The alignment
>> occurs with the stream task.  Stream tasks keep queues per topic-partition
>> (which may be from different topics), and select the next record to
>> processed by selecting the queue with the lowest timestamp.
>>
>> The equivalent in Flink would be for the Kafka connector source to select
>> the message among partitions with the lowest timestamp to emit next, and
>> for multiple input stream operators to select the message among inputs
>> with
>> the lowest timestamp to process.
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Aljoscha Krettek-2
Sorry for also derailing this a bit earlier...

I think the two things (shared state and new source interface) are somewhat orthogonal. The new source interface itself alone doesn't solve the problem, we would still need some mechanism for sharing the event-time information between different subtasks. This could be the state sharing mechanism. Therefore I would say we should not block one on the other and therefore should go ahead with state sharing.

With recent releases we started to abstract Akka away behind RPC interfaces, so we probably shouldn't introduce a hard dependency on Akka (or another system) again. Maybe Till (cc'ed) could shed some light on this. It might be that we just have to design a generic interface and then use Akka underneath.


> On 10. Oct 2018, at 16:18, Jamie Grier <[hidden email]> wrote:
>
> Also, I'm afraid I derailed this thread just a bit..  So also back to
> Thomas's original question..
>
> If we decide state-sharing across source subtasks is the way forward for
> now -- does anybody have thoughts to share on what form this should take?
>
> Thomas mentioned Akka or JGroups.  Other thoughts?
>
>
> On Wed, Oct 10, 2018 at 6:58 AM Jamie Grier <[hidden email]> wrote:
>
>> Okay, so I think there is a lot of agreement here about (a) This is a real
>> issue for people, and (b) an ideal long-term approach to solving it.
>>
>> As Aljoscha and Elias said a full solution to this would be to also
>> redesign the source interface such that individual partitions are exposed
>> in the API and not hidden inside sources like now -- then we could be much
>> smarter about the way we read from the individual partitions.  We would
>> also have to modify the stream task code such that it also reads in a
>> time-aligned way throughout the data flow to solve the full problem --
>> either that or use some shared state between sources to keep them
>> time-aligned across sub-tasks just at the source.
>>
>> With regard to this question of state sharing between source sub-tasks
>> versus modifying Flink to do time-aligned reads throughout the system --
>> does anybody have a strong opinion on this?
>>
>> We're basically looking for a way forward and our initial approach, though
>> ugly because it requires modification to all of the sources we use, was
>> going to be to share state between source sub-tasks in order to keep them
>> time-aligned with no further modifications required to Flink's core.
>>
>> However, if it seems reasonable to do and there is consensus on the best
>> way forward maybe we should be looking at introducing the time-alignment
>> properly instead of hacking the sources.
>>
>>
>>
>>
>> On Tue, Oct 9, 2018 at 12:01 PM Elias Levy <[hidden email]>
>> wrote:
>>
>>> On Tue, Oct 9, 2018 at 1:25 AM Aljoscha Krettek <[hidden email]>
>>> wrote:
>>>
>>>> @Elias Do you know if Kafka Consumers do this alignment across multiple
>>>> consumers or only within one Consumer across the partitions that it
>>> reads
>>>> from.
>>>>
>>>
>>> The behavior is part of Kafka Streams
>>> <
>>> https://github.com/apache/kafka/blob/96132e2dbb69a0c6c11cb183bb05cefef4e30557/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java#L65
>>>> ,
>>> not the Kafka consumer.  The alignment does not occur across Kafka
>>> consumers, but that is because Kafka Streams, unlikely Flink, uses a
>>> single
>>> consumer to fetch records from multiple sources / topics.  The alignment
>>> occurs with the stream task.  Stream tasks keep queues per topic-partition
>>> (which may be from different topics), and select the next record to
>>> processed by selecting the queue with the lowest timestamp.
>>>
>>> The equivalent in Flink would be for the Kafka connector source to select
>>> the message among partitions with the lowest timestamp to emit next, and
>>> for multiple input stream operators to select the message among inputs
>>> with
>>> the lowest timestamp to process.
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Elias Levy
On Wed, Oct 10, 2018 at 8:15 AM Aljoscha Krettek <[hidden email]>
wrote:

> I think the two things (shared state and new source interface) are
> somewhat orthogonal. The new source interface itself alone doesn't solve
> the problem, we would still need some mechanism for sharing the event-time
> information between different subtasks. This could be the state sharing
> mechanism. Therefore I would say we should not block one on the other and
> therefore should go ahead with state sharing.
>

Is that really the case?  The reason Thomas gave for the request to share
state among subtasks was to implement stream alignment.  If streams can be
aligned, then the given reason for state sharing disappears.  Not to say
there aren't other situations where state sharing could be useful.  It
would have been handy in a number of our jobs.

Also, it's not clear to me that if sources (and multiple streams operators)
were performing time alignment, you'd need some mechanism for sharing
even-time information between subtasks.  Each source and multiple input
operator can perform its own local alignment and back-pressure can take
care of squelching sources that are advancing too fast.
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Fabian Hueske-2
I think the new source interface would be designed to be able to leverage
shared state to achieve time alignment.
I don't think this would be possible without some kind of shared state.

The problem of tasks that are far ahead in time cannot be solved with
back-pressure.
That's because a task cannot choose from which source task it accepts
events and from which doesn't.
If it blocks an input, all downstream tasks that are connected to the
operator are affected. This can easily lead to deadlocks.
Therefore, all operators need to be able to handle events when they arrive.
If they cannot process them yet because they are too far ahead in time,
they are put in state.



Am Mi., 10. Okt. 2018 um 18:15 Uhr schrieb Elias Levy <
[hidden email]>:

> On Wed, Oct 10, 2018 at 8:15 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > I think the two things (shared state and new source interface) are
> > somewhat orthogonal. The new source interface itself alone doesn't solve
> > the problem, we would still need some mechanism for sharing the
> event-time
> > information between different subtasks. This could be the state sharing
> > mechanism. Therefore I would say we should not block one on the other and
> > therefore should go ahead with state sharing.
> >
>
> Is that really the case?  The reason Thomas gave for the request to share
> state among subtasks was to implement stream alignment.  If streams can be
> aligned, then the given reason for state sharing disappears.  Not to say
> there aren't other situations where state sharing could be useful.  It
> would have been handy in a number of our jobs.
>
> Also, it's not clear to me that if sources (and multiple streams operators)
> were performing time alignment, you'd need some mechanism for sharing
> even-time information between subtasks.  Each source and multiple input
> operator can perform its own local alignment and back-pressure can take
> care of squelching sources that are advancing too fast.
>
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Thomas Weise
Thanks for the feedback and comments so far.

I want to elaborate more on the need for the shared state and awareness of
watermark alignment in the source implementation. Sources like Kafka and
Kinesis pull from the external system and then emit the records. For
Kinesis, we have multiple consumer threads (one per shard), that fetch from
Kinesis and push the records downstream. Each of those threads logically
have a different watermark. (Note that the Kinesis source in Flink
currently does not have any source watermarking support, we have
implemented that in our own extension at Lyft).

The source needs logic to decide which threads should pause because they
are ahead. That logic needs to take into account other threads and other
subtasks (for which we need the shared state). A watermark aware back
pressure mechanism would be great, but we cannot block the entire source.
We need to only stop reading those shards (or Kafka partitions) that have
gotten too far ahead of others.

Thanks,
Thomas


On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <[hidden email]> wrote:

> I think the new source interface would be designed to be able to leverage
> shared state to achieve time alignment.
> I don't think this would be possible without some kind of shared state.
>
> The problem of tasks that are far ahead in time cannot be solved with
> back-pressure.
> That's because a task cannot choose from which source task it accepts
> events and from which doesn't.
> If it blocks an input, all downstream tasks that are connected to the
> operator are affected. This can easily lead to deadlocks.
> Therefore, all operators need to be able to handle events when they arrive.
> If they cannot process them yet because they are too far ahead in time,
> they are put in state.
>
>
>
> Am Mi., 10. Okt. 2018 um 18:15 Uhr schrieb Elias Levy <
> [hidden email]>:
>
> > On Wed, Oct 10, 2018 at 8:15 AM Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > I think the two things (shared state and new source interface) are
> > > somewhat orthogonal. The new source interface itself alone doesn't
> solve
> > > the problem, we would still need some mechanism for sharing the
> > event-time
> > > information between different subtasks. This could be the state sharing
> > > mechanism. Therefore I would say we should not block one on the other
> and
> > > therefore should go ahead with state sharing.
> > >
> >
> > Is that really the case?  The reason Thomas gave for the request to share
> > state among subtasks was to implement stream alignment.  If streams can
> be
> > aligned, then the given reason for state sharing disappears.  Not to say
> > there aren't other situations where state sharing could be useful.  It
> > would have been handy in a number of our jobs.
> >
> > Also, it's not clear to me that if sources (and multiple streams
> operators)
> > were performing time alignment, you'd need some mechanism for sharing
> > even-time information between subtasks.  Each source and multiple input
> > operator can perform its own local alignment and back-pressure can take
> > care of squelching sources that are advancing too fast.
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Elias Levy
In reply to this post by Fabian Hueske-2
On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <[hidden email]> wrote:

> I think the new source interface would be designed to be able to leverage
> shared state to achieve time alignment.
> I don't think this would be possible without some kind of shared state.
>
> The problem of tasks that are far ahead in time cannot be solved with
> back-pressure.
> That's because a task cannot choose from which source task it accepts
> events and from which doesn't.
> If it blocks an input, all downstream tasks that are connected to the
> operator are affected. This can easily lead to deadlocks.
> Therefore, all operators need to be able to handle events when they arrive.
> If they cannot process them yet because they are too far ahead in time,
> they are put in state.
>

The idea I was suggesting is not for operators to block an input.  Rather,
it is that they selectively choose from which input to process the next
message from based on their timestamp, so long as there are buffered
messages waiting to be processed.  That is a best-effort alignment
strategy.  Seems to work relatively well in practice, at least within Kafka
Streams.

E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
its inputs.  Instead, it could keep them separate and selectively consume
from the one that had a buffer available, and if both have buffers
available, from the buffer with the messages with a lower timestamp.
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Aljoscha Krettek-2
The reason this selective reading doesn't work well in Flink in the moment is because of checkpointing. For checkpointing, checkpoint barriers travel within the streams. If we selectively read from inputs based on timestamps this is akin to blocking an input if that input is very far ahead in event time, which can happen when you have a very fast source and a slow source (in event time), maybe because you're in a catchup phase. In those cases it's better to simply not read the data at the sources, as Thomas said. This is also because with Kafka Streams, each operator is basically its own job: it's reading from Kafka and writing to Kafka and there is not a complex graph of different operations with network shuffles in between, as you have with Flink.

This different nature of Flink is also why I think that readers need awareness of other readers to do the event-time alignment, and this is where shared state comes in.

> On 10. Oct 2018, at 20:47, Elias Levy <[hidden email]> wrote:
>
> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <[hidden email]> wrote:
>
>> I think the new source interface would be designed to be able to leverage
>> shared state to achieve time alignment.
>> I don't think this would be possible without some kind of shared state.
>>
>> The problem of tasks that are far ahead in time cannot be solved with
>> back-pressure.
>> That's because a task cannot choose from which source task it accepts
>> events and from which doesn't.
>> If it blocks an input, all downstream tasks that are connected to the
>> operator are affected. This can easily lead to deadlocks.
>> Therefore, all operators need to be able to handle events when they arrive.
>> If they cannot process them yet because they are too far ahead in time,
>> they are put in state.
>>
>
> The idea I was suggesting is not for operators to block an input.  Rather,
> it is that they selectively choose from which input to process the next
> message from based on their timestamp, so long as there are buffered
> messages waiting to be processed.  That is a best-effort alignment
> strategy.  Seems to work relatively well in practice, at least within Kafka
> Streams.
>
> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
> its inputs.  Instead, it could keep them separate and selectively consume
> from the one that had a buffer available, and if both have buffers
> available, from the buffer with the messages with a lower timestamp.

Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Till Rohrmann
But on the Kafka source level it should be perfectly fine to do what Elias
proposed. This is of course is not the perfect solution but could bring us
forward quite a bit. The changes required for this should also be minimal.
This would become obsolete once we have something like shared state. But
until then, I think it would worth a try.

Cheers,
Till

On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <[hidden email]>
wrote:

> The reason this selective reading doesn't work well in Flink in the moment
> is because of checkpointing. For checkpointing, checkpoint barriers travel
> within the streams. If we selectively read from inputs based on timestamps
> this is akin to blocking an input if that input is very far ahead in event
> time, which can happen when you have a very fast source and a slow source
> (in event time), maybe because you're in a catchup phase. In those cases
> it's better to simply not read the data at the sources, as Thomas said.
> This is also because with Kafka Streams, each operator is basically its own
> job: it's reading from Kafka and writing to Kafka and there is not a
> complex graph of different operations with network shuffles in between, as
> you have with Flink.
>
> This different nature of Flink is also why I think that readers need
> awareness of other readers to do the event-time alignment, and this is
> where shared state comes in.
>
> > On 10. Oct 2018, at 20:47, Elias Levy <[hidden email]>
> wrote:
> >
> > On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <[hidden email]> wrote:
> >
> >> I think the new source interface would be designed to be able to
> leverage
> >> shared state to achieve time alignment.
> >> I don't think this would be possible without some kind of shared state.
> >>
> >> The problem of tasks that are far ahead in time cannot be solved with
> >> back-pressure.
> >> That's because a task cannot choose from which source task it accepts
> >> events and from which doesn't.
> >> If it blocks an input, all downstream tasks that are connected to the
> >> operator are affected. This can easily lead to deadlocks.
> >> Therefore, all operators need to be able to handle events when they
> arrive.
> >> If they cannot process them yet because they are too far ahead in time,
> >> they are put in state.
> >>
> >
> > The idea I was suggesting is not for operators to block an input.
> Rather,
> > it is that they selectively choose from which input to process the next
> > message from based on their timestamp, so long as there are buffered
> > messages waiting to be processed.  That is a best-effort alignment
> > strategy.  Seems to work relatively well in practice, at least within
> Kafka
> > Streams.
> >
> > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for both
> > its inputs.  Instead, it could keep them separate and selectively consume
> > from the one that had a buffer available, and if both have buffers
> > available, from the buffer with the messages with a lower timestamp.
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Jamie Grier-3
Here's a doc I started describing some changes we would like to make
starting with the Kinesis Source.. It describes a refactoring of that code
specifically and also hopefully a pattern and some reusable code we can use
in the other sources as well.  The end goal would be best-effort event-time
synchronization across all Flink sources but we are going to start with the
Kinesis Source first.

Please take a look and please provide thoughts and opinions about the best
state sharing mechanism to use -- that section is left blank and we're
especially looking for input there.

https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing

-Jamie


On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <[hidden email]> wrote:

> But on the Kafka source level it should be perfectly fine to do what Elias
> proposed. This is of course is not the perfect solution but could bring us
> forward quite a bit. The changes required for this should also be minimal.
> This would become obsolete once we have something like shared state. But
> until then, I think it would worth a try.
>
> Cheers,
> Till
>
> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > The reason this selective reading doesn't work well in Flink in the
> moment
> > is because of checkpointing. For checkpointing, checkpoint barriers
> travel
> > within the streams. If we selectively read from inputs based on
> timestamps
> > this is akin to blocking an input if that input is very far ahead in
> event
> > time, which can happen when you have a very fast source and a slow source
> > (in event time), maybe because you're in a catchup phase. In those cases
> > it's better to simply not read the data at the sources, as Thomas said.
> > This is also because with Kafka Streams, each operator is basically its
> own
> > job: it's reading from Kafka and writing to Kafka and there is not a
> > complex graph of different operations with network shuffles in between,
> as
> > you have with Flink.
> >
> > This different nature of Flink is also why I think that readers need
> > awareness of other readers to do the event-time alignment, and this is
> > where shared state comes in.
> >
> > > On 10. Oct 2018, at 20:47, Elias Levy <[hidden email]>
> > wrote:
> > >
> > > On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <[hidden email]>
> wrote:
> > >
> > >> I think the new source interface would be designed to be able to
> > leverage
> > >> shared state to achieve time alignment.
> > >> I don't think this would be possible without some kind of shared
> state.
> > >>
> > >> The problem of tasks that are far ahead in time cannot be solved with
> > >> back-pressure.
> > >> That's because a task cannot choose from which source task it accepts
> > >> events and from which doesn't.
> > >> If it blocks an input, all downstream tasks that are connected to the
> > >> operator are affected. This can easily lead to deadlocks.
> > >> Therefore, all operators need to be able to handle events when they
> > arrive.
> > >> If they cannot process them yet because they are too far ahead in
> time,
> > >> they are put in state.
> > >>
> > >
> > > The idea I was suggesting is not for operators to block an input.
> > Rather,
> > > it is that they selectively choose from which input to process the next
> > > message from based on their timestamp, so long as there are buffered
> > > messages waiting to be processed.  That is a best-effort alignment
> > > strategy.  Seems to work relatively well in practice, at least within
> > Kafka
> > > Streams.
> > >
> > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
> both
> > > its inputs.  Instead, it could keep them separate and selectively
> consume
> > > from the one that had a buffer available, and if both have buffers
> > > available, from the buffer with the messages with a lower timestamp.
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

回复:Sharing state between subtasks

Zhijiang(wangzhijiang999)
Just noticed this discussion from @Till Rohrmann's weekly community update and I want to share some thoughts from our experiences.

We also encountered the source consuption skew issue before, and we are focused on improving this by two possible ways.

1. Control the read strategy by the downstream side. In detail, every input channel in downstream task corresponds to the consumption of one upstream source task, and we will tag each input channel with watermark to find the lowest channel to read in high priority. In essence, we actually rely on the mechanism of backpressure. If the channel with highest timestamp is not read by downstream task for a while, it will block the corresponding source task to read when the buffers are exhausted. It is no need to change the source interface in this way, but there are two major concerns: first it will affect the barier alignment resulting in checkpoint delayed or expired. Second it can not confirm source consumption alignment very precisely, and it is just a best effort way. So we gave up this way finally.

2. Add the new component of SourceCoordinator to coordinate the source consumption distributedly. For example we can start this componnet in the JobManager like the current role of CheckpointCoordinator. Then every source task would commnicate with JobManager via current RPC mechanism, maybe we can rely on the heartbeat message to attach the consumption progress as the payloads. The JobManagerwill accumulator or state all the reported progress and then give responses for different source tasks. We can define a protocol for indicating the fast soruce task to sleep for specific time for example. To do so, the coordinator has the global informations to give the proper decision for individuals, so it seems more precise. And it will not affect the barrier alignment, because the sleeping fast source can release the lock to emit barrier as normal. The only concern is the changes for source interface and may affect all related source implementations.

Currently we prefer to the second way to implement and will refer to other good points above. :)

Best,
Zhijiang
------------------------------------------------------------------
发件人:Jamie Grier <[hidden email]>
发送时间:2018年10月17日(星期三) 03:28
收件人:dev <[hidden email]>
主 题:Re: Sharing state between subtasks

Here's a doc I started describing some changes we would like to make
starting with the Kinesis Source.. It describes a refactoring of that code
specifically and also hopefully a pattern and some reusable code we can use
in the other sources as well.  The end goal would be best-effort event-time
synchronization across all Flink sources but we are going to start with the
Kinesis Source first.

Please take a look and please provide thoughts and opinions about the best
state sharing mechanism to use -- that section is left blank and we're
especially looking for input there.

https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing

-Jamie


On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <[hidden email]> wrote:

> But on the Kafka source level it should be perfectly fine to do what Elias
> proposed. This is of course is not the perfect solution but could bring us
> forward quite a bit. The changes required for this should also be minimal.
> This would become obsolete once we have something like shared state. But
> until then, I think it would worth a try.
>
> Cheers,
> Till
>
> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > The reason this selective reading doesn't work well in Flink in the
> moment
> > is because of checkpointing. For checkpointing, checkpoint barriers
> travel
> > within the streams. If we selectively read from inputs based on
> timestamps
> > this is akin to blocking an input if that input is very far ahead in
> event
> > time, which can happen when you have a very fast source and a slow source
> > (in event time), maybe because you're in a catchup phase. In those cases
> > it's better to simply not read the data at the sources, as Thomas said.
> > This is also because with Kafka Streams, each operator is basically its
> own
> > job: it's reading from Kafka and writing to Kafka and there is not a
> > complex graph of different operations with network shuffles in between,
> as
> > you have with Flink.
> >
> > This different nature of Flink is also why I think that readers need
> > awareness of other readers to do the event-time alignment, and this is
> > where shared state comes in.
> >
> > > On 10. Oct 2018, at 20:47, Elias Levy <[hidden email]>
> > wrote:
> > >
> > > On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <[hidden email]>
> wrote:
> > >
> > >> I think the new source interface would be designed to be able to
> > leverage
> > >> shared state to achieve time alignment.
> > >> I don't think this would be possible without some kind of shared
> state.
> > >>
> > >> The problem of tasks that are far ahead in time cannot be solved with
> > >> back-pressure.
> > >> That's because a task cannot choose from which source task it accepts
> > >> events and from which doesn't.
> > >> If it blocks an input, all downstream tasks that are connected to the
> > >> operator are affected. This can easily lead to deadlocks.
> > >> Therefore, all operators need to be able to handle events when they
> > arrive.
> > >> If they cannot process them yet because they are too far ahead in
> time,
> > >> they are put in state.
> > >>
> > >
> > > The idea I was suggesting is not for operators to block an input.
> > Rather,
> > > it is that they selectively choose from which input to process the next
> > > message from based on their timestamp, so long as there are buffered
> > > messages waiting to be processed.  That is a best-effort alignment
> > > strategy.  Seems to work relatively well in practice, at least within
> > Kafka
> > > Streams.
> > >
> > > E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
> both
> > > its inputs.  Instead, it could keep them separate and selectively
> consume
> > > from the one that had a buffer available, and if both have buffers
> > > available, from the buffer with the messages with a lower timestamp.
> >
> >
>

Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Aljoscha Krettek-2
Hi Zhijiang,

do you already have working code or a design doc for the second approach?

Best,
Aljoscha

> On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <[hidden email]> wrote:
>
> Just noticed this discussion from @Till Rohrmann's weekly community update and I want to share some thoughts from our experiences.
>
> We also encountered the source consuption skew issue before, and we are focused on improving this by two possible ways.
>
> 1. Control the read strategy by the downstream side. In detail, every input channel in downstream task corresponds to the consumption of one upstream source task, and we will tag each input channel with watermark to find the lowest channel to read in high priority. In essence, we actually rely on the mechanism of backpressure. If the channel with highest timestamp is not read by downstream task for a while, it will block the corresponding source task to read when the buffers are exhausted. It is no need to change the source interface in this way, but there are two major concerns: first it will affect the barier alignment resulting in checkpoint delayed or expired. Second it can not confirm source consumption alignment very precisely, and it is just a best effort way. So we gave up this way finally.
>
> 2. Add the new component of SourceCoordinator to coordinate the source consumption distributedly. For example we can start this componnet in the JobManager like the current role of CheckpointCoordinator. Then every source task would commnicate with JobManager via current RPC mechanism, maybe we can rely on the heartbeat message to attach the consumption progress as the payloads. The JobManagerwill accumulator or state all the reported progress and then give responses for different source tasks. We can define a protocol for indicating the fast soruce task to sleep for specific time for example. To do so, the coordinator has the global informations to give the proper decision for individuals, so it seems more precise. And it will not affect the barrier alignment, because the sleeping fast source can release the lock to emit barrier as normal. The only concern is the changes for source interface and may affect all related source implementations.
>
> Currently we prefer to the second way to implement and will refer to other good points above. :)
>
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Jamie Grier <[hidden email]>
> 发送时间:2018年10月17日(星期三) 03:28
> 收件人:dev <[hidden email]>
> 主 题:Re: Sharing state between subtasks
>
> Here's a doc I started describing some changes we would like to make
> starting with the Kinesis Source.. It describes a refactoring of that code
> specifically and also hopefully a pattern and some reusable code we can use
> in the other sources as well.  The end goal would be best-effort event-time
> synchronization across all Flink sources but we are going to start with the
> Kinesis Source first.
>
> Please take a look and please provide thoughts and opinions about the best
> state sharing mechanism to use -- that section is left blank and we're
> especially looking for input there.
>
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
>
> -Jamie
>
>
> On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <[hidden email]> wrote:
>
>> But on the Kafka source level it should be perfectly fine to do what Elias
>> proposed. This is of course is not the perfect solution but could bring us
>> forward quite a bit. The changes required for this should also be minimal.
>> This would become obsolete once we have something like shared state. But
>> until then, I think it would worth a try.
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>>> The reason this selective reading doesn't work well in Flink in the
>> moment
>>> is because of checkpointing. For checkpointing, checkpoint barriers
>> travel
>>> within the streams. If we selectively read from inputs based on
>> timestamps
>>> this is akin to blocking an input if that input is very far ahead in
>> event
>>> time, which can happen when you have a very fast source and a slow source
>>> (in event time), maybe because you're in a catchup phase. In those cases
>>> it's better to simply not read the data at the sources, as Thomas said.
>>> This is also because with Kafka Streams, each operator is basically its
>> own
>>> job: it's reading from Kafka and writing to Kafka and there is not a
>>> complex graph of different operations with network shuffles in between,
>> as
>>> you have with Flink.
>>>
>>> This different nature of Flink is also why I think that readers need
>>> awareness of other readers to do the event-time alignment, and this is
>>> where shared state comes in.
>>>
>>>> On 10. Oct 2018, at 20:47, Elias Levy <[hidden email]>
>>> wrote:
>>>>
>>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <[hidden email]>
>> wrote:
>>>>
>>>>> I think the new source interface would be designed to be able to
>>> leverage
>>>>> shared state to achieve time alignment.
>>>>> I don't think this would be possible without some kind of shared
>> state.
>>>>>
>>>>> The problem of tasks that are far ahead in time cannot be solved with
>>>>> back-pressure.
>>>>> That's because a task cannot choose from which source task it accepts
>>>>> events and from which doesn't.
>>>>> If it blocks an input, all downstream tasks that are connected to the
>>>>> operator are affected. This can easily lead to deadlocks.
>>>>> Therefore, all operators need to be able to handle events when they
>>> arrive.
>>>>> If they cannot process them yet because they are too far ahead in
>> time,
>>>>> they are put in state.
>>>>>
>>>>
>>>> The idea I was suggesting is not for operators to block an input.
>>> Rather,
>>>> it is that they selectively choose from which input to process the next
>>>> message from based on their timestamp, so long as there are buffered
>>>> messages waiting to be processed.  That is a best-effort alignment
>>>> strategy.  Seems to work relatively well in practice, at least within
>>> Kafka
>>>> Streams.
>>>>
>>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
>> both
>>>> its inputs.  Instead, it could keep them separate and selectively
>> consume
>>>> from the one that had a buffer available, and if both have buffers
>>>> available, from the buffer with the messages with a lower timestamp.
>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

回复:Sharing state between subtasks

Zhijiang(wangzhijiang999)
Not yet. We only have some initial thoughts and have not worked on it yet. We will update the progress in this discussion if have.

Best,
Zhijiang
------------------------------------------------------------------
发件人:Aljoscha Krettek <[hidden email]>
发送时间:2018年10月18日(星期四) 17:53
收件人:dev <[hidden email]>; Zhijiang(wangzhijiang999) <[hidden email]>
抄 送:Till Rohrmann <[hidden email]>
主 题:Re: Sharing state between subtasks

Hi Zhijiang,

do you already have working code or a design doc for the second approach?

Best,
Aljoscha

> On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <[hidden email]> wrote:
>
> Just noticed this discussion from @Till Rohrmann's weekly community update and I want to share some thoughts from our experiences.
>
> We also encountered the source consuption skew issue before, and we are focused on improving this by two possible ways.
>
> 1. Control the read strategy by the downstream side. In detail, every input channel in downstream task corresponds to the consumption of one upstream source task, and we will tag each input channel with watermark to find the lowest channel to read in high priority. In essence, we actually rely on the mechanism of backpressure. If the channel with highest timestamp is not read by downstream task for a while, it will block the corresponding source task to read when the buffers are exhausted. It is no need to change the source interface in this way, but there are two major concerns: first it will affect the barier alignment resulting in checkpoint delayed or expired. Second it can not confirm source consumption alignment very precisely, and it is just a best effort way. So we gave up this way finally.
>
> 2. Add the new component of SourceCoordinator to coordinate the source consumption distributedly. For example we can start this componnet in the JobManager like the current role of CheckpointCoordinator. Then every source task would commnicate with JobManager via current RPC mechanism, maybe we can rely on the heartbeat message to attach the consumption progress as the payloads. The JobManagerwill accumulator or state all the reported progress and then give responses for different source tasks. We can define a protocol for indicating the fast soruce task to sleep for specific time for example. To do so, the coordinator has the global informations to give the proper decision for individuals, so it seems more precise. And it will not affect the barrier alignment, because the sleeping fast source can release the lock to emit barrier as normal. The only concern is the changes for source interface and may affect all related source implementations.
>
> Currently we prefer to the second way to implement and will refer to other good points above. :)
>
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Jamie Grier <[hidden email]>
> 发送时间:2018年10月17日(星期三) 03:28
> 收件人:dev <[hidden email]>
> 主 题:Re: Sharing state between subtasks
>
> Here's a doc I started describing some changes we would like to make
> starting with the Kinesis Source.. It describes a refactoring of that code
> specifically and also hopefully a pattern and some reusable code we can use
> in the other sources as well.  The end goal would be best-effort event-time
> synchronization across all Flink sources but we are going to start with the
> Kinesis Source first.
>
> Please take a look and please provide thoughts and opinions about the best
> state sharing mechanism to use -- that section is left blank and we're
> especially looking for input there.
>
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
>
> -Jamie
>
>
> On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <[hidden email]> wrote:
>
>> But on the Kafka source level it should be perfectly fine to do what Elias
>> proposed. This is of course is not the perfect solution but could bring us
>> forward quite a bit. The changes required for this should also be minimal.
>> This would become obsolete once we have something like shared state. But
>> until then, I think it would worth a try.
>>
>> Cheers,
>> Till
>>
>> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>>> The reason this selective reading doesn't work well in Flink in the
>> moment
>>> is because of checkpointing. For checkpointing, checkpoint barriers
>> travel
>>> within the streams. If we selectively read from inputs based on
>> timestamps
>>> this is akin to blocking an input if that input is very far ahead in
>> event
>>> time, which can happen when you have a very fast source and a slow source
>>> (in event time), maybe because you're in a catchup phase. In those cases
>>> it's better to simply not read the data at the sources, as Thomas said.
>>> This is also because with Kafka Streams, each operator is basically its
>> own
>>> job: it's reading from Kafka and writing to Kafka and there is not a
>>> complex graph of different operations with network shuffles in between,
>> as
>>> you have with Flink.
>>>
>>> This different nature of Flink is also why I think that readers need
>>> awareness of other readers to do the event-time alignment, and this is
>>> where shared state comes in.
>>>
>>>> On 10. Oct 2018, at 20:47, Elias Levy <[hidden email]>
>>> wrote:
>>>>
>>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <[hidden email]>
>> wrote:
>>>>
>>>>> I think the new source interface would be designed to be able to
>>> leverage
>>>>> shared state to achieve time alignment.
>>>>> I don't think this would be possible without some kind of shared
>> state.
>>>>>
>>>>> The problem of tasks that are far ahead in time cannot be solved with
>>>>> back-pressure.
>>>>> That's because a task cannot choose from which source task it accepts
>>>>> events and from which doesn't.
>>>>> If it blocks an input, all downstream tasks that are connected to the
>>>>> operator are affected. This can easily lead to deadlocks.
>>>>> Therefore, all operators need to be able to handle events when they
>>> arrive.
>>>>> If they cannot process them yet because they are too far ahead in
>> time,
>>>>> they are put in state.
>>>>>
>>>>
>>>> The idea I was suggesting is not for operators to block an input.
>>> Rather,
>>>> it is that they selectively choose from which input to process the next
>>>> message from based on their timestamp, so long as there are buffered
>>>> messages waiting to be processed.  That is a best-effort alignment
>>>> strategy.  Seems to work relatively well in practice, at least within
>>> Kafka
>>>> Streams.
>>>>
>>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
>> both
>>>> its inputs.  Instead, it could keep them separate and selectively
>> consume
>>>> from the one that had a buffer available, and if both have buffers
>>>> available, from the buffer with the messages with a lower timestamp.
>>>
>>>
>>
>


Reply | Threaded
Open this post in threaded view
|

Re: Sharing state between subtasks

Thomas Weise
Hi,

We are planning to work on the Kinesis consumer in the following order:

1. Add per shard watermarking:
https://issues.apache.org/jira/browse/FLINK-5697 - this will be code we
already use internally; I will open a PR to add it to the Flink Kinesis
consumer
2. Exchange of per subtask watermarks between all subtasks of one or
multiple sources
3. Implement queue approach described in Jamie's document in to utilize 1.)
and 2.) to align the shard consumers WRT event time

There was some discussion regarding the mechanism to share the watermarks
between subtasks. If there is something that can be re-used it would be
great. Otherwise I'm going to further investigate the Akka or JGroups
route. Regarding Akka, since it is used within Flink already, is there an
abstraction that you would recommend to consider to avoid direct dependency?

Thanks,
Thomas



On Thu, Oct 18, 2018 at 8:34 PM Zhijiang(wangzhijiang999)
<[hidden email]> wrote:

> Not yet. We only have some initial thoughts and have not worked on it yet.
> We will update the progress in this discussion if have.
>
> Best,
> Zhijiang
> ------------------------------------------------------------------
> 发件人:Aljoscha Krettek <[hidden email]>
> 发送时间:2018年10月18日(星期四) 17:53
> 收件人:dev <[hidden email]>; Zhijiang(wangzhijiang999) <
> [hidden email]>
> 抄 送:Till Rohrmann <[hidden email]>
> 主 题:Re: Sharing state between subtasks
>
> Hi Zhijiang,
>
> do you already have working code or a design doc for the second approach?
>
> Best,
> Aljoscha
>
> > On 18. Oct 2018, at 08:42, Zhijiang(wangzhijiang999) <
> [hidden email]> wrote:
> >
> > Just noticed this discussion from @Till Rohrmann's weekly community
> update and I want to share some thoughts from our experiences.
> >
> > We also encountered the source consuption skew issue before, and we are
> focused on improving this by two possible ways.
> >
> > 1. Control the read strategy by the downstream side. In detail, every
> input channel in downstream task corresponds to the consumption of one
> upstream source task, and we will tag each input channel with watermark to
> find the lowest channel to read in high priority. In essence, we actually
> rely on the mechanism of backpressure. If the channel with highest
> timestamp is not read by downstream task for a while, it will block the
> corresponding source task to read when the buffers are exhausted. It is no
> need to change the source interface in this way, but there are two major
> concerns: first it will affect the barier alignment resulting in checkpoint
> delayed or expired. Second it can not confirm source consumption alignment
> very precisely, and it is just a best effort way. So we gave up this way
> finally.
> >
> > 2. Add the new component of SourceCoordinator to coordinate the source
> consumption distributedly. For example we can start this componnet in the
> JobManager like the current role of CheckpointCoordinator. Then every
> source task would commnicate with JobManager via current RPC mechanism,
> maybe we can rely on the heartbeat message to attach the consumption
> progress as the payloads. The JobManagerwill accumulator or state all the
> reported progress and then give responses for different source tasks. We
> can define a protocol for indicating the fast soruce task to sleep for
> specific time for example. To do so, the coordinator has the global
> informations to give the proper decision for individuals, so it seems more
> precise. And it will not affect the barrier alignment, because the sleeping
> fast source can release the lock to emit barrier as normal. The only
> concern is the changes for source interface and may affect all related
> source implementations.
> >
> > Currently we prefer to the second way to implement and will refer to
> other good points above. :)
> >
> > Best,
> > Zhijiang
> > ------------------------------------------------------------------
> > 发件人:Jamie Grier <[hidden email]>
> > 发送时间:2018年10月17日(星期三) 03:28
> > 收件人:dev <[hidden email]>
> > 主 题:Re: Sharing state between subtasks
> >
> > Here's a doc I started describing some changes we would like to make
> > starting with the Kinesis Source.. It describes a refactoring of that
> code
> > specifically and also hopefully a pattern and some reusable code we can
> use
> > in the other sources as well.  The end goal would be best-effort
> event-time
> > synchronization across all Flink sources but we are going to start with
> the
> > Kinesis Source first.
> >
> > Please take a look and please provide thoughts and opinions about the
> best
> > state sharing mechanism to use -- that section is left blank and we're
> > especially looking for input there.
> >
> >
> https://docs.google.com/document/d/13HxfsucoN7aUls1FX202KFAVZ4IMLXEZWyRfZNd6WFM/edit?usp=sharing
> >
> > -Jamie
> >
> >
> > On Wed, Oct 10, 2018 at 11:57 PM Till Rohrmann <[hidden email]>
> wrote:
> >
> >> But on the Kafka source level it should be perfectly fine to do what
> Elias
> >> proposed. This is of course is not the perfect solution but could bring
> us
> >> forward quite a bit. The changes required for this should also be
> minimal.
> >> This would become obsolete once we have something like shared state. But
> >> until then, I think it would worth a try.
> >>
> >> Cheers,
> >> Till
> >>
> >> On Thu, Oct 11, 2018 at 8:03 AM Aljoscha Krettek <[hidden email]>
> >> wrote:
> >>
> >>> The reason this selective reading doesn't work well in Flink in the
> >> moment
> >>> is because of checkpointing. For checkpointing, checkpoint barriers
> >> travel
> >>> within the streams. If we selectively read from inputs based on
> >> timestamps
> >>> this is akin to blocking an input if that input is very far ahead in
> >> event
> >>> time, which can happen when you have a very fast source and a slow
> source
> >>> (in event time), maybe because you're in a catchup phase. In those
> cases
> >>> it's better to simply not read the data at the sources, as Thomas said.
> >>> This is also because with Kafka Streams, each operator is basically its
> >> own
> >>> job: it's reading from Kafka and writing to Kafka and there is not a
> >>> complex graph of different operations with network shuffles in between,
> >> as
> >>> you have with Flink.
> >>>
> >>> This different nature of Flink is also why I think that readers need
> >>> awareness of other readers to do the event-time alignment, and this is
> >>> where shared state comes in.
> >>>
> >>>> On 10. Oct 2018, at 20:47, Elias Levy <[hidden email]>
> >>> wrote:
> >>>>
> >>>> On Wed, Oct 10, 2018 at 9:33 AM Fabian Hueske <[hidden email]>
> >> wrote:
> >>>>
> >>>>> I think the new source interface would be designed to be able to
> >>> leverage
> >>>>> shared state to achieve time alignment.
> >>>>> I don't think this would be possible without some kind of shared
> >> state.
> >>>>>
> >>>>> The problem of tasks that are far ahead in time cannot be solved with
> >>>>> back-pressure.
> >>>>> That's because a task cannot choose from which source task it accepts
> >>>>> events and from which doesn't.
> >>>>> If it blocks an input, all downstream tasks that are connected to the
> >>>>> operator are affected. This can easily lead to deadlocks.
> >>>>> Therefore, all operators need to be able to handle events when they
> >>> arrive.
> >>>>> If they cannot process them yet because they are too far ahead in
> >> time,
> >>>>> they are put in state.
> >>>>>
> >>>>
> >>>> The idea I was suggesting is not for operators to block an input.
> >>> Rather,
> >>>> it is that they selectively choose from which input to process the
> next
> >>>> message from based on their timestamp, so long as there are buffered
> >>>> messages waiting to be processed.  That is a best-effort alignment
> >>>> strategy.  Seems to work relatively well in practice, at least within
> >>> Kafka
> >>>> Streams.
> >>>>
> >>>> E.g. at the moment StreamTwoInputProcessor uses a UnionInputGate for
> >> both
> >>>> its inputs.  Instead, it could keep them separate and selectively
> >> consume
> >>>> from the one that had a buffer available, and if both have buffers
> >>>> available, from the buffer with the messages with a lower timestamp.
> >>>
> >>>
> >>
> >
>
>
>
12