Kinesis consumer shard skew - FLINK-8516

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Kinesis consumer shard skew - FLINK-8516

Thomas Weise
Hi,

We notice the uneven distribution of shards over subtasks after
re-sharding. We believe that our use case can be addressed by sorting
shards and assigning them to subtasks by index, with caveats.

The main problem will be that the shard-subtask mapping won't be
deterministic, while current hash based solution is (but causes skew).

Possibly the trade-off will be difficult to overcome (for a generalized
solution) without centralizing the shard assignment, which would in turn
require something like side inputs.

Any opinions on this? Would it be acceptable to make changes to the
existing operator that make the shard assignment logic and hashing easier
to customize?

Thanks,
Thomas
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis consumer shard skew - FLINK-8516

Tzu-Li (Gordon) Tai
Hi Thomas,

Yes, you are right that sorting and then assigning shard-subtask mappings would not have deterministic assignment.
Non-deterministic assignments would cause issues when restoring the consumer state.

Regarding centralizing shard assignment: there actually has been ongoing discussion about centralizing shard / partition discovery, which should also be able to resolve this issue.

For now, a general solution would be to move away from distributing shards over subtasks in a round-robin fashion, but just use simple uniform hashing.
This would avoid serious skew in specific Kinesis rescaling scenarios compared to the current solution, but in cases where Kinesis streams weren’t sharded at all, we would maybe not have a perfect distribution.

Any opinions on this? Would it be acceptable to make changes to the 
existing operator that make the shard assignment logic and hashing easier 
to customize? 

Making the assignment logic pluggable actually sounds like a good overall solution.
The shard state in the FlinkKinesisConsumer is a Union list state, meaning that all consumer subtasks will see all shard states on restore.
This should allow us to have different shard assignment logic when restoring.

Cc’ing also Aljoscha on this topic.

Cheers,
Gordon

On 26 January 2018 at 5:47:05 PM, Thomas Weise ([hidden email]) wrote:

Hi,  

We notice the uneven distribution of shards over subtasks after  
re-sharding. We believe that our use case can be addressed by sorting  
shards and assigning them to subtasks by index, with caveats.  

The main problem will be that the shard-subtask mapping won't be  
deterministic, while current hash based solution is (but causes skew).  

Possibly the trade-off will be difficult to overcome (for a generalized  
solution) without centralizing the shard assignment, which would in turn  
require something like side inputs.  

Any opinions on this? Would it be acceptable to make changes to the  
existing operator that make the shard assignment logic and hashing easier  
to customize?  

Thanks,  
Thomas  
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis consumer shard skew - FLINK-8516

Thomas Weise
> Yes, you are right that sorting and then assigning shard-subtask mappings
> would not have deterministic assignment.
> Non-deterministic assignments would cause issues when restoring the
> consumer state.
>

Isn't the issue that some shard assignments may not have been checkpointed
and so may end up in different subtasks when they are re-discovered?


> For now, a general solution would be to move away from distributing shards
> over subtasks in a round-robin fashion, but just use simple uniform hashing.
> This would avoid serious skew in specific Kinesis rescaling scenarios
> compared to the current solution, but in cases where Kinesis streams
> weren’t sharded at all, we would maybe not have a perfect distribution.
>

Below are a few examples of distribution with variation in hash function.

(Result is map numberOfShards -> numberOfSubTasksWithThatNumberOfShards)

#1 current hashing:

int hash = 17;
hash = 37 * hash + streamName.hashCode();
hash = 37 * hash + shardId.hashCode();

{0=21, 1=8, 2=2, 3=4, 4=3, 5=2, 6=5, 7=2, 8=5, 9=3, 10=3, 11=3, 12=3}

#2  Hashing.consistentHash

int hash = Hashing.consistentHash(shardId.hashCode(),
totalNumberOfConsumerSubtasks);
{0=1, 1=3, 2=9, 3=18, 4=11, 5=8, 6=7, 7=4, 8=2, 11=1}

#3 Hashing.murmur3_32()
int hash = hf.hashUnencodedChars(shardId).asInt();

{0=2, 1=5, 2=11, 3=9, 4=12, 5=12, 6=7, 8=4, 10=2}

#2 isn't perfect but closer to where we would like to be. And since there
is no silver bullet the user should be able to override the hashing.


The shard state in the FlinkKinesisConsumer is a Union list state, meaning
> that all consumer subtasks will see all shard states on restore.
> This should allow us to have different shard assignment logic when
> restoring.
>

Do you have a scenario in mind where we would not want to retain
checkpointed assignments?


Thanks,
Thomas
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis consumer shard skew - FLINK-8516

Tzu-Li (Gordon) Tai
> Isn't the issue that some shard assignments may not have been checkpointed 
> and so may end up in different subtasks when they are re-discovered? 

That is part of the problem, yes. For example, it would also be problematic for shard discovery.
If the assignment is non-deterministic, a source subtask could duplicately read a shard that was already picked up by another source subtask.

In general, for the Kinesis consumer (as well as Kafka), shard-to-source assignment is static during an execution of a job, and a deterministic assignment assures that each shard (including newly discovered shards) is assigned to exactly one source subtask.

> Below are a few examples of distribution with variation in hash function ...


Exactly, and that’s why I agree that it would make sense to support pluggable assignment hashing for the consumer. This is especially true for the Kinesis consumer, because the exact pattern of 



On 29 January 2018 at 7:38:14 AM, Thomas Weise ([hidden email]) wrote:

> Yes, you are right that sorting and then assigning shard-subtask mappings  
> would not have deterministic assignment.  
> Non-deterministic assignments would cause issues when restoring the  
> consumer state.  
>  

Isn't the issue that some shard assignments may not have been checkpointed  
and so may end up in different subtasks when they are re-discovered?  


> For now, a general solution would be to move away from distributing shards  
> over subtasks in a round-robin fashion, but just use simple uniform hashing.  
> This would avoid serious skew in specific Kinesis rescaling scenarios  
> compared to the current solution, but in cases where Kinesis streams  
> weren’t sharded at all, we would maybe not have a perfect distribution.  
>  

Below are a few examples of distribution with variation in hash function.  

(Result is map numberOfShards -> numberOfSubTasksWithThatNumberOfShards)  

#1 current hashing:  

int hash = 17;  
hash = 37 * hash + streamName.hashCode();  
hash = 37 * hash + shardId.hashCode();  

{0=21, 1=8, 2=2, 3=4, 4=3, 5=2, 6=5, 7=2, 8=5, 9=3, 10=3, 11=3, 12=3}  

#2 Hashing.consistentHash  

int hash = Hashing.consistentHash(shardId.hashCode(),  
totalNumberOfConsumerSubtasks);  
{0=1, 1=3, 2=9, 3=18, 4=11, 5=8, 6=7, 7=4, 8=2, 11=1}  

#3 Hashing.murmur3_32()  
int hash = hf.hashUnencodedChars(shardId).asInt();  

{0=2, 1=5, 2=11, 3=9, 4=12, 5=12, 6=7, 8=4, 10=2}  

#2 isn't perfect but closer to where we would like to be. And since there  
is no silver bullet the user should be able to override the hashing.  


The shard state in the FlinkKinesisConsumer is a Union list state, meaning  
> that all consumer subtasks will see all shard states on restore.  
> This should allow us to have different shard assignment logic when  
> restoring.  
>  

Do you have a scenario in mind where we would not want to retain  
checkpointed assignments?  


Thanks,  
Thomas  
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis consumer shard skew - FLINK-8516

Tzu-Li (Gordon) Tai
In reply to this post by Thomas Weise
(Sorry, I accidentally sent out my un-finished reply too early. Here’s the full reply.)

> Isn't the issue that some shard assignments may not have been checkpointed 
> and so may end up in different subtasks when they are re-discovered? 

That is part of the problem, yes. For example, it would also be problematic for shard discovery.
If the assignment is non-deterministic, a source subtask could duplicately read a shard that was already picked up by another source subtask.

In general, for the Kinesis consumer (as well as Kafka), shard-to-source assignment is static during an execution of a job, and a deterministic assignment assures that each shard (including newly discovered shards) is assigned to exactly one source subtask.

> Below are a few examples of distribution with variation in hash function. ...
> And since there is no silver bullet the user should be able to override the hashing. 

Exactly, and that’s why I agree that it would make sense to support pluggable assignment hashing for the consumer.
This is especially true for the Kinesis consumer, because the exact sequence of the shard ids in the case of rescaling Kinesis, varies quite differently depending on how the resharding occurred.
With this supported, I might also imagine that a user would want to savepoint and restore the job with a new assignment hashing that better suits the current sequence of shard ids, if the current assignment of the execution has become somewhat skewed after several reshardings.

Do you have a scenario in mind where we would not want to retain 
checkpointed assignments? 

What do you mean by checkpoint “assignments”? The assignment from shard-to-source is only fixed within a single execution of the job. We only checkpoint the progress of each shard in the state.
Given that we support plugging in custom shard assignment hashing, then the assignment could potentially change every time we restore.

If what you mean is actually retaining checkpointed shard state (i.e. the progress sequence number), then:
I don’t really see a reason why a user would want to ignore checkpointed shard sequence numbers, but it could really just be my lack of knowledge for possible real user scenarios.
Though, ignoring checkpointed shard sequence numbers on restore from a savepoint would immediately break exactly-once guarantees, so if we do have a case for that, we need to be very educative in its use and side effects.

Cheers,
Gordon

On 29 January 2018 at 7:38:14 AM, Thomas Weise ([hidden email]) wrote:

> Yes, you are right that sorting and then assigning shard-subtask mappings  
> would not have deterministic assignment.  
> Non-deterministic assignments would cause issues when restoring the  
> consumer state.  
>  

Isn't the issue that some shard assignments may not have been checkpointed  
and so may end up in different subtasks when they are re-discovered?  


> For now, a general solution would be to move away from distributing shards  
> over subtasks in a round-robin fashion, but just use simple uniform hashing.  
> This would avoid serious skew in specific Kinesis rescaling scenarios  
> compared to the current solution, but in cases where Kinesis streams  
> weren’t sharded at all, we would maybe not have a perfect distribution.  
>  

Below are a few examples of distribution with variation in hash function.  

(Result is map numberOfShards -> numberOfSubTasksWithThatNumberOfShards)  

#1 current hashing:  

int hash = 17;  
hash = 37 * hash + streamName.hashCode();  
hash = 37 * hash + shardId.hashCode();  

{0=21, 1=8, 2=2, 3=4, 4=3, 5=2, 6=5, 7=2, 8=5, 9=3, 10=3, 11=3, 12=3}  

#2 Hashing.consistentHash  

int hash = Hashing.consistentHash(shardId.hashCode(),  
totalNumberOfConsumerSubtasks);  
{0=1, 1=3, 2=9, 3=18, 4=11, 5=8, 6=7, 7=4, 8=2, 11=1}  

#3 Hashing.murmur3_32()  
int hash = hf.hashUnencodedChars(shardId).asInt();  

{0=2, 1=5, 2=11, 3=9, 4=12, 5=12, 6=7, 8=4, 10=2}  

#2 isn't perfect but closer to where we would like to be. And since there  
is no silver bullet the user should be able to override the hashing.  


The shard state in the FlinkKinesisConsumer is a Union list state, meaning  
> that all consumer subtasks will see all shard states on restore.  
> This should allow us to have different shard assignment logic when  
> restoring.  
>  

Do you have a scenario in mind where we would not want to retain  
checkpointed assignments?  


Thanks,  
Thomas  
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis consumer shard skew - FLINK-8516

Thomas Weise
>
>
> What do you mean by checkpoint “assignments”? The assignment from
> shard-to-source is only fixed within a single execution of the job. We only
> checkpoint the progress of each shard in the state.
> Given that we support plugging in custom shard assignment hashing, then
> the assignment could potentially change every time we restore.
>
> If what you mean is actually retaining checkpointed shard state (i.e. the
> progress sequence number), then:
> I don’t really see a reason why a user would want to ignore checkpointed
> shard sequence numbers, but it could really just be my lack of knowledge
> for possible real user scenarios.
> Though, ignoring checkpointed shard sequence numbers on restore from a
> savepoint would immediately break exactly-once guarantees, so if we do have
> a case for that, we need to be very educative in its use and side effects.
>
>
At the moment only the shard offsets are saved, and not the subtask
association. With "checkpointed assignments" I meant saving which shards
belong to which subtask, but that may lead to problems when changing the
consumer parallelism.

It seems that balanced distribution is hard to achieve without
synchronization between subtasks. There is the possibility of subtasks
intermittently retrieving different shard lists while resharding occurs.
The assignment logic would either need to only consider the shardID and
result in skewed distribution (current implementation) or there needs to be
a barrier at which each subtask is guaranteed to see the same shard list,
which would allow for round-robin distribution.

In order to rebase the mapping after resharding, we would probably need all
subtasks to agree on the shard list and most recent offsets (distributed
consensus) and apply changes at a checkpoint barrier? I really don't see
how else we can end up with balanced shard distribution as generic solution.

Thanks,
Thomas
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis consumer shard skew - FLINK-8516

Thomas Weise
I created a PR for further discussion:

https://github.com/apache/flink/pull/5393

There are a few TODOs where I think improvements can be made. Let me know
if you agree with the overall direction.

Thanks,
Thomas


On Mon, Jan 29, 2018 at 3:01 PM, Thomas Weise <[hidden email]> wrote:

>
>> What do you mean by checkpoint “assignments”? The assignment from
>> shard-to-source is only fixed within a single execution of the job. We only
>> checkpoint the progress of each shard in the state.
>> Given that we support plugging in custom shard assignment hashing, then
>> the assignment could potentially change every time we restore.
>>
>> If what you mean is actually retaining checkpointed shard state (i.e. the
>> progress sequence number), then:
>> I don’t really see a reason why a user would want to ignore checkpointed
>> shard sequence numbers, but it could really just be my lack of knowledge
>> for possible real user scenarios.
>> Though, ignoring checkpointed shard sequence numbers on restore from a
>> savepoint would immediately break exactly-once guarantees, so if we do have
>> a case for that, we need to be very educative in its use and side effects.
>>
>>
> At the moment only the shard offsets are saved, and not the subtask
> association. With "checkpointed assignments" I meant saving which shards
> belong to which subtask, but that may lead to problems when changing the
> consumer parallelism.
>
> It seems that balanced distribution is hard to achieve without
> synchronization between subtasks. There is the possibility of subtasks
> intermittently retrieving different shard lists while resharding occurs.
> The assignment logic would either need to only consider the shardID and
> result in skewed distribution (current implementation) or there needs to be
> a barrier at which each subtask is guaranteed to see the same shard list,
> which would allow for round-robin distribution.
>
> In order to rebase the mapping after resharding, we would probably need
> all subtasks to agree on the shard list and most recent offsets
> (distributed consensus) and apply changes at a checkpoint barrier? I really
> don't see how else we can end up with balanced shard distribution as
> generic solution.
>
> Thanks,
> Thomas
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Re: Kinesis consumer shard skew - FLINK-8516

Tzu-Li (Gordon) Tai
Hi Thomas,

Thanks a lot for opening the PR.
I had a look at it and the comment you left, and left my thoughts there. In general, I think it’s heading towards the right direction.

Cheers,
Gordon

On 31 January 2018 at 4:03:36 PM, Thomas Weise ([hidden email]) wrote:

Hi Gordon,

Can you have a quick look at the PR and the comment I added. That will help to polish it up and make it ready for review.

Thanks!
Thomas

--
sent from mobile
---------- Forwarded message ----------
From: "Thomas Weise" <[hidden email]>
Date: Jan 30, 2018 5:53 PM
Subject: Re: Kinesis consumer shard skew - FLINK-8516
To: "Tzu-Li (Gordon) Tai" <[hidden email]>
Cc: <[hidden email]>

I created a PR for further discussion:

https://github.com/apache/flink/pull/5393

There are a few TODOs where I think improvements can be made. Let me know if you agree with the overall direction.

Thanks,
Thomas


On Mon, Jan 29, 2018 at 3:01 PM, Thomas Weise <[hidden email]> wrote:

What do you mean by checkpoint “assignments”? The assignment from shard-to-source is only fixed within a single execution of the job. We only checkpoint the progress of each shard in the state.
Given that we support plugging in custom shard assignment hashing, then the assignment could potentially change every time we restore.

If what you mean is actually retaining checkpointed shard state (i.e. the progress sequence number), then:
I don’t really see a reason why a user would want to ignore checkpointed shard sequence numbers, but it could really just be my lack of knowledge for possible real user scenarios.
Though, ignoring checkpointed shard sequence numbers on restore from a savepoint would immediately break exactly-once guarantees, so if we do have a case for that, we need to be very educative in its use and side effects.


At the moment only the shard offsets are saved, and not the subtask association. With "checkpointed assignments" I meant saving which shards belong to which subtask, but that may lead to problems when changing the consumer parallelism.

It seems that balanced distribution is hard to achieve without synchronization between subtasks. There is the possibility of subtasks intermittently retrieving different shard lists while resharding occurs. The assignment logic would either need to only consider the shardID and result in skewed distribution (current implementation) or there needs to be a barrier at which each subtask is guaranteed to see the same shard list, which would allow for round-robin distribution.

In order to rebase the mapping after resharding, we would probably need all subtasks to agree on the shard list and most recent offsets (distributed consensus) and apply changes at a checkpoint barrier? I really don't see how else we can end up with balanced shard distribution as generic solution.

Thanks,
Thomas


Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Re: Kinesis consumer shard skew - FLINK-8516

Thomas Weise
Thanks! I will cleanup the PR and then open it for review.

--
sent from mobile

On Jan 31, 2018 7:26 AM, "Tzu-Li (Gordon) Tai" <[hidden email]> wrote:

> Hi Thomas,
>
> Thanks a lot for opening the PR.
> I had a look at it and the comment you left, and left my thoughts there.
> In general, I think it’s heading towards the right direction.
>
> Cheers,
> Gordon
>
> On 31 January 2018 at 4:03:36 PM, Thomas Weise ([hidden email]) wrote:
>
> Hi Gordon,
>
> Can you have a quick look at the PR and the comment I added. That will
> help to polish it up and make it ready for review.
>
> Thanks!
> Thomas
>
> --
> sent from mobile
> ---------- Forwarded message ----------
> From: "Thomas Weise" <[hidden email]>
> Date: Jan 30, 2018 5:53 PM
> Subject: Re: Kinesis consumer shard skew - FLINK-8516
> To: "Tzu-Li (Gordon) Tai" <[hidden email]>
> Cc: <[hidden email]>
>
> I created a PR for further discussion:
>>
>> https://github.com/apache/flink/pull/5393
>>
>> There are a few TODOs where I think improvements can be made. Let me know
>> if you agree with the overall direction.
>>
>> Thanks,
>> Thomas
>>
>>
>> On Mon, Jan 29, 2018 at 3:01 PM, Thomas Weise <[hidden email]> wrote:
>>
>>>
>>>> What do you mean by checkpoint “assignments”? The assignment from
>>>> shard-to-source is only fixed within a single execution of the job. We only
>>>> checkpoint the progress of each shard in the state.
>>>> Given that we support plugging in custom shard assignment hashing, then
>>>> the assignment could potentially change every time we restore.
>>>>
>>>> If what you mean is actually retaining checkpointed shard state (i.e.
>>>> the progress sequence number), then:
>>>> I don’t really see a reason why a user would want to ignore
>>>> checkpointed shard sequence numbers, but it could really just be my lack of
>>>> knowledge for possible real user scenarios.
>>>> Though, ignoring checkpointed shard sequence numbers on restore from a
>>>> savepoint would immediately break exactly-once guarantees, so if we do have
>>>> a case for that, we need to be very educative in its use and side effects.
>>>>
>>>>
>>> At the moment only the shard offsets are saved, and not the subtask
>>> association. With "checkpointed assignments" I meant saving which shards
>>> belong to which subtask, but that may lead to problems when changing the
>>> consumer parallelism.
>>>
>>> It seems that balanced distribution is hard to achieve without
>>> synchronization between subtasks. There is the possibility of subtasks
>>> intermittently retrieving different shard lists while resharding occurs.
>>> The assignment logic would either need to only consider the shardID and
>>> result in skewed distribution (current implementation) or there needs to be
>>> a barrier at which each subtask is guaranteed to see the same shard list,
>>> which would allow for round-robin distribution.
>>>
>>> In order to rebase the mapping after resharding, we would probably need
>>> all subtasks to agree on the shard list and most recent offsets
>>> (distributed consensus) and apply changes at a checkpoint barrier? I really
>>> don't see how else we can end up with balanced shard distribution as
>>> generic solution.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>