[DISCUSS] Adding configure and close methods to the Kafka(De)SerializationSchema interfaces

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Adding configure and close methods to the Kafka(De)SerializationSchema interfaces

Gyula Fóra
Hi all!

While implementing a new custom flink serialization schema that wraps an
existing Kafka serializer, I realized we are missing 2 key methods that
could be easily added:

void configure(java.util.Map<java.lang.String,?> configs);
void close();

We could rename configure to open but Kafka serializers have a configure
method.
The configure method would be called when the operator start with the
provided kafka properties and the close when it shuts down.

Currently there is no way to access the properties from the schema
interfaces or close the schema on failure.

This would be a very simple addition and could be added as optional methods
to the interface to not break any schemas that are implemented as lambdas.

What do you think?

Gyula
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Adding configure and close methods to the Kafka(De)SerializationSchema interfaces

Arvid Heise
Hi Gyula,

when looking at the ConfluentRegistryAvroDeserializationSchema [1], it
seems like the intended way is to pass all configuration parameters in the
constructor. So you could call open there.

Could you please line out in more details why this is not enough? What
would you do in open and close respectively?

Best,

Arvid

[1]
https://github.com/apache/flink/blob/master/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java

On Thu, Sep 5, 2019 at 9:43 AM Gyula Fóra <[hidden email]> wrote:

> Hi all!
>
> While implementing a new custom flink serialization schema that wraps an
> existing Kafka serializer, I realized we are missing 2 key methods that
> could be easily added:
>
> void configure(java.util.Map<java.lang.String,?> configs);
> void close();
>
> We could rename configure to open but Kafka serializers have a configure
> method.
> The configure method would be called when the operator start with the
> provided kafka properties and the close when it shuts down.
>
> Currently there is no way to access the properties from the schema
> interfaces or close the schema on failure.
>
> This would be a very simple addition and could be added as optional methods
> to the interface to not break any schemas that are implemented as lambdas.
>
> What do you think?
>
> Gyula
>


--

Arvid Heise | Senior Software Engineer

<https://www.ververica.com/>

Follow us @VervericaData

--

Join Flink Forward <https://flink-forward.org/> - The Apache Flink
Conference

Stream Processing | Event Driven | Real Time

--

Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany

--
Ververica GmbH
Registered at Amtsgericht Charlottenburg: HRB 158244 B
Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Adding configure and close methods to the Kafka(De)SerializationSchema interfaces

Gyula Fóra
Hi Arvid,

The ConfluentRegistryAvroDeserializationSchema uses a checkAvroInitialized()
call for every single record to initialize the schema for the first time.
This is clearly an indication of a missing open/configure method. In
addition some of the Kafka serializers rely on properties that are usually
passed together with the Kafka configuration. Adding a configure method
that gets the kafka properties provides a familiar way of implementing it
without having to pass properties twice. (Once for the Producer/Consumer
and once for the schema).

The example you mentioned doesn't implement any closing logic. Imegine if
the schema registry would have created a background thread to fetch data
and would have to be closed. There is no way to do that now.
The confluent schema registry doesnt work this way, but other registries
might.

I hope this answers your question.

Gyula


On Thu, Sep 5, 2019 at 10:01 AM Arvid Heise <[hidden email]> wrote:

> Hi Gyula,
>
> when looking at the ConfluentRegistryAvroDeserializationSchema [1], it
> seems like the intended way is to pass all configuration parameters in the
> constructor. So you could call open there.
>
> Could you please line out in more details why this is not enough? What
> would you do in open and close respectively?
>
> Best,
>
> Arvid
>
> [1]
>
> https://github.com/apache/flink/blob/master/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/ConfluentRegistryAvroDeserializationSchema.java
>
> On Thu, Sep 5, 2019 at 9:43 AM Gyula Fóra <[hidden email]> wrote:
>
> > Hi all!
> >
> > While implementing a new custom flink serialization schema that wraps an
> > existing Kafka serializer, I realized we are missing 2 key methods that
> > could be easily added:
> >
> > void configure(java.util.Map<java.lang.String,?> configs);
> > void close();
> >
> > We could rename configure to open but Kafka serializers have a configure
> > method.
> > The configure method would be called when the operator start with the
> > provided kafka properties and the close when it shuts down.
> >
> > Currently there is no way to access the properties from the schema
> > interfaces or close the schema on failure.
> >
> > This would be a very simple addition and could be added as optional
> methods
> > to the interface to not break any schemas that are implemented as
> lambdas.
> >
> > What do you think?
> >
> > Gyula
> >
>
>
> --
>
> Arvid Heise | Senior Software Engineer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Dr. Kostas Tzoumas, Dr. Stephan Ewen
>