Igal Shilman created FLINK-16123:
------------------------------------
Summary: Add routable Kafka connector
Key: FLINK-16123
URL:
https://issues.apache.org/jira/browse/FLINK-16123 Project: Flink
Issue Type: Task
Components: Stateful Functions
Reporter: Igal Shilman
In some cases it is beneficial to associate a stateful function instance with a key in a Kafka topic.
In that case, a simplified Kafka ingress definition can be introduced.
Consider the following example:
Imagine a Kafka topic named "signups" (1) where the keys are ut8 strings representing user ids,
and the values are Protobuf messages of type (2) com.user.foo.bar.greeter.SingupMessage.
We would like to have a stateful function of type(3)
{code:java}
FunctionType( com.user.foo.bar, SingupProcessor{code}
to be invoked for each incoming signup message.
The following spec definition:
{code:java}
- ingress:
meta:
type: org.apache.flink.statefun.sdk.kafka/routable-kafka-connector
id: com.user.foo.bar/greeter
spec:
properties:
- consumer.group: greeter
topics:
- singups: (1)
typeUrl: (2) "com.user.foo.bar.greeter.SingupMessage"
target: (3) "com.user.foo.bar/SingupProcessor"
{code}
Defines a Kafka ingress that consumes <utf8 strings, bytes > from a singups topic,
and produces an Routable Protobuf message with the following type and properties:
{code}
message Routable {
Address target; (1)
Any payload;
}
{code}
Where:
(1) is Address(FunctionType(com.user.foo.bar, SingupProcessor), <a consumer record's key>)
(2) the Any's typeUrl would be com.user.foo.bar.greeter.SingupMessage and the value bytes
would come directly from the consumer record value bytes
This would require an additional AutoRoutable router,
that basically forwards the payload to the target address.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)