Kinesis connector shard discovery mechanism

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Kinesis connector shard discovery mechanism

Евгений Юшин
Hi there

Flink Kinesis consumer checks shards id for a particular pattern:
"^shardId-\\d{12}"

https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/model/StreamShardHandle.java#L132

While this inlines with current Kinesis streams server implementation (all
streams follows this pattern), it confronts with AWS docs:
*ShardId*
The unique identifier of the shard within the stream.
Type: String
Length Constraints: Minimum length of 1. Maximum length of 128.

*Pattern: [a-zA-Z0-9_.-]+*Required: Yes

https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html

*Intention:*
We have no guarantees and can't rely on patterns other than provided in AWS
manifest.
Any custom implementation of Kinesis mock should rely on AWS manifest which
claims ShardID to be alfanums. This prevents anyone to use Flink with such
kind of mocks.

The reason behind the scene to use particular pattern "^shardId-\\d{12}" is
to create Flink's custom Shard comparator, filter already seen shards, and
pass latest shard for client.listShards only to limit the scope for RPC
call to AWS.

In the meantime, I think we can get rid of this logic at all. The current
usage in project is:
- fix Kinesalite bug (I've already opened an issue to cover this:
https://github.com/mhart/kinesalite/issues/76). We can move this logic to
test code base to keep production code clean for now
https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L464

- adjust last seen shard id. We can simply omit this cause' AWS client
won't return already seen shards and we will have new ids only or nothing.
https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java#L475
https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2cb1c45775c9/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java#L406


What do you think?

Regards,
Eugen
Reply | Threaded
Open this post in threaded view
|

Re: Kinesis connector shard discovery mechanism

Ying Xu-2
+1 on separating out the logic relevant to Kinesalite. Kinesalite is likely
used in testing environment a lot.


On Thu, Sep 13, 2018 at 1:52 AM, Евгений Юшин <[hidden email]>
wrote:

> Hi there
>
> Flink Kinesis consumer checks shards id for a particular pattern:
> "^shardId-\\d{12}"
>
> https://github.com/apache/flink/blob/master/flink-
> connectors/flink-connector-kinesis/src/main/java/org/
> apache/flink/streaming/connectors/kinesis/model/
> StreamShardHandle.java#L132
>
> While this inlines with current Kinesis streams server implementation (all
> streams follows this pattern), it confronts with AWS docs:
> *ShardId*
> The unique identifier of the shard within the stream.
> Type: String
> Length Constraints: Minimum length of 1. Maximum length of 128.
>
> *Pattern: [a-zA-Z0-9_.-]+*Required: Yes
>
> https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html
>
> *Intention:*
> We have no guarantees and can't rely on patterns other than provided in AWS
> manifest.
> Any custom implementation of Kinesis mock should rely on AWS manifest which
> claims ShardID to be alfanums. This prevents anyone to use Flink with such
> kind of mocks.
>
> The reason behind the scene to use particular pattern "^shardId-\\d{12}" is
> to create Flink's custom Shard comparator, filter already seen shards, and
> pass latest shard for client.listShards only to limit the scope for RPC
> call to AWS.
>
> In the meantime, I think we can get rid of this logic at all. The current
> usage in project is:
> - fix Kinesalite bug (I've already opened an issue to cover this:
> https://github.com/mhart/kinesalite/issues/76). We can move this logic to
> test code base to keep production code clean for now
> https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2c
> b1c45775c9/flink-connectors/flink-connector-kinesis/src/
> main/java/org/apache/flink/streaming/connectors/kinesis/
> proxy/KinesisProxy.java#L464
>
> - adjust last seen shard id. We can simply omit this cause' AWS client
> won't return already seen shards and we will have new ids only or nothing.
> https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2c
> b1c45775c9/flink-connectors/flink-connector-kinesis/src/
> main/java/org/apache/flink/streaming/connectors/kinesis/
> internals/KinesisDataFetcher.java#L475
> https://github.com/apache/flink/blob/50d076ab6ad325907690a2c115ee2c
> b1c45775c9/flink-connectors/flink-connector-kinesis/src/
> main/java/org/apache/flink/streaming/connectors/kinesis/
> proxy/KinesisProxy.java#L406
>
>
> What do you think?
>
> Regards,
> Eugen
>