[jira] [Created] (FLINK-15769) Allow configuring offset startup positions for Stateful Functions Kafka Ingress

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-15769) Allow configuring offset startup positions for Stateful Functions Kafka Ingress

Shang Yuanchun (Jira)
Tzu-Li (Gordon) Tai created FLINK-15769:
-------------------------------------------

             Summary: Allow configuring offset startup positions for Stateful Functions Kafka Ingress
                 Key: FLINK-15769
                 URL: https://issues.apache.org/jira/browse/FLINK-15769
             Project: Flink
          Issue Type: New Feature
          Components: Stateful Functions
            Reporter: Tzu-Li (Gordon) Tai
            Assignee: Tzu-Li (Gordon) Tai


It is quite typical that a user is capable of setting where to start consuming a Kafka topic.
Since the Stateful Functions Kafka ingress sits on top of Flink's Kafka consumer, there is already various options to start with:

* {{GROUP_OFFSETS}} (default): start with whatever offsets were committed to Kafka for given {{group.id}}
* {{LATEST}}: start from latest record in topic
* {{EARLIEST}}: start from earliest record in topic
* {{SPECIFIC_OFFSETS}}: provide a map of topic partition -> offset. This is particularly important for bootstrapped state scenarios, where the user would want to start from a specific position consistent with the state bootstrapped in their functions.
* {{TIMESTAMP}}: start from offsets written starting from the given timestamp.

The proposed API looks like so:

{code}
KafkaIngressBuilder<T> builder = KafkaIngressBuilder.forIdentifier(...)
    .withTopic(...)
    .withDeserializer(...)
    .withDefaultStartPosition(KafkaIngressStartPosition.fromEarliest()/fromLatest())
    .withSpecificStartOffsets(KafkaIngressStartOffsets.fromMap(Map)/fromTimestamp(Long))
{code}

The {{withDefaultStartPosition}} method is straightforward.
The reason to separate this from another {{withSpecificStartOffsets}} method is that there would be cases where some partition does not contain the offsets specified by {{withSpecificStartOffsets}}.
In this case, the ingress would need to fallback to some default configuration; this would be the {{withDefaultStartPosition}} configuration.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)