[DISCUSS] Flink Avro Cloudera Registry (FLINK-14577)

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Flink Avro Cloudera Registry (FLINK-14577)

Őrhidi Mátyás
Dear Flink Community!

We have noticed a recent request for Hortonworks schema registry support (
FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We have
an implementation for it already, and we would be happy to contribute it to
Apache Flink.

You can find the documentation below[1]. Let us know your thoughts!

Best Regards,
Matyas

[1] Flink Avro Cloudera Registry User Guide
-----------------------------------------------------------

Add the following dependency to use the schema registry integration:
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-avro-cloudera-registry</artifactId>
    <version>${flink.version}</version>
</dependency>


The schema registry can be plugged directly into the FlinkKafkaConsumer and
FlinkKafkaProducer using the appropriate schema:
-
org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
-
org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema


Supported types
----------------------
- Avro Specific Record types
- Avro Generic Records
- Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, Long,
String, Boolean

SchemaRegistrySerializationSchema
--------------------------------------------------
The serialization schema can be constructed using the included builder
object SchemaRegistrySerializationSchema.builder(..).

Required settings:
- Topic configuration when creating the builder. Can be static or dynamic
(extracted from the data)
- RegistryAddress parameter on the builder to establish the connection

Optional settings:
- Arbitrary SchemaRegistry client configuration using the setConfig method
- Key configuration for the produced Kafka messages
 - By specifying a KeySelector function that extracts the key from each
record
 - Using a Tuple2 stream for (key, value) pairs directly
- Security configuration

Example:
KafkaSerializationSchema<ItemTransaction> schema =
SchemaRegistrySerializationSchema
    .<ItemTransaction>builder(topic)
    .setRegistryAddress(registryAddress)
    .setKey(ItemTransaction::getItemId)
    .build();
FlinkKafkaProducer<ItemTransaction> sink = new
FlinkKafkaProducer<>("dummy", schema, kafkaProps,
FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

SchemaRegistryDeserializationSchema
-----------------------------------------------------
The deserialization schema can be constructed using the included builder
object SchemaRegistryDeserializationSchema.builder(..).
When reading messages (and keys) we always have to specify the expected
Class<T> or record Schema of the input records so that Flink can do any
necessary conversion between the data on Kafka and what is expected.

Required settings:
- Class or Schema of the input messages depending on the data type
- RegistryAddress parameter on the builder to establish the connection

Optional settings:
- Arbitrary SchemaRegistry client configuration using the setConfig method
- Key configuration for the consumed Kafka messages
 - Should only be specified when we want to read the keys as well into a
(key, value) stream
- Security configuration

Example:
KafkaDeserializationSchema<ItemTransaction> schema =
SchemaRegistryDeserializationSchema
   .builder(ItemTransaction.class)
   .setRegistryAddress(registryAddress)
   .build();
FlinkKafkaConsumer<ItemTransaction> source = new
FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Avro Cloudera Registry (FLINK-14577)

dwysakowicz
Hi Matyas,

I think this would be a valuable addition. You may reuse some of the
already available abstractions for writing avro deserialization schema
based on a schema registry (have a look at RegistryDeserializationSchema
and SchemaCoderProvider). There is also an opened PR for adding a
similar serialization schema[1].

The only concern is that I am not 100% sure what is the consensus on
which connectors do we want to adapt into the main repository and which
would we prefer to be hosted separately and included in the ecosystem
webpage[2] (that I hope will be published soon).

Whatever option will be preferred I could help review the code.

Best,

Dawid

[1] https://github.com/apache/flink/pull/8371

[2]
http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html

On 05/11/2019 12:40, Őrhidi Mátyás wrote:

> Dear Flink Community!
>
> We have noticed a recent request for Hortonworks schema registry support (
> FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We have
> an implementation for it already, and we would be happy to contribute it to
> Apache Flink.
>
> You can find the documentation below[1]. Let us know your thoughts!
>
> Best Regards,
> Matyas
>
> [1] Flink Avro Cloudera Registry User Guide
> -----------------------------------------------------------
>
> Add the following dependency to use the schema registry integration:
> <dependency>
>     <groupId>org.apache.flink</groupId>
>     <artifactId>flink-avro-cloudera-registry</artifactId>
>     <version>${flink.version}</version>
> </dependency>
>
>
> The schema registry can be plugged directly into the FlinkKafkaConsumer and
> FlinkKafkaProducer using the appropriate schema:
> -
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
> -
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema
>
>
> Supported types
> ----------------------
> - Avro Specific Record types
> - Avro Generic Records
> - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float, Long,
> String, Boolean
>
> SchemaRegistrySerializationSchema
> --------------------------------------------------
> The serialization schema can be constructed using the included builder
> object SchemaRegistrySerializationSchema.builder(..).
>
> Required settings:
> - Topic configuration when creating the builder. Can be static or dynamic
> (extracted from the data)
> - RegistryAddress parameter on the builder to establish the connection
>
> Optional settings:
> - Arbitrary SchemaRegistry client configuration using the setConfig method
> - Key configuration for the produced Kafka messages
>  - By specifying a KeySelector function that extracts the key from each
> record
>  - Using a Tuple2 stream for (key, value) pairs directly
> - Security configuration
>
> Example:
> KafkaSerializationSchema<ItemTransaction> schema =
> SchemaRegistrySerializationSchema
>     .<ItemTransaction>builder(topic)
>     .setRegistryAddress(registryAddress)
>     .setKey(ItemTransaction::getItemId)
>     .build();
> FlinkKafkaProducer<ItemTransaction> sink = new
> FlinkKafkaProducer<>("dummy", schema, kafkaProps,
> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>
> SchemaRegistryDeserializationSchema
> -----------------------------------------------------
> The deserialization schema can be constructed using the included builder
> object SchemaRegistryDeserializationSchema.builder(..).
> When reading messages (and keys) we always have to specify the expected
> Class<T> or record Schema of the input records so that Flink can do any
> necessary conversion between the data on Kafka and what is expected.
>
> Required settings:
> - Class or Schema of the input messages depending on the data type
> - RegistryAddress parameter on the builder to establish the connection
>
> Optional settings:
> - Arbitrary SchemaRegistry client configuration using the setConfig method
> - Key configuration for the consumed Kafka messages
>  - Should only be specified when we want to read the keys as well into a
> (key, value) stream
> - Security configuration
>
> Example:
> KafkaDeserializationSchema<ItemTransaction> schema =
> SchemaRegistryDeserializationSchema
>    .builder(ItemTransaction.class)
>    .setRegistryAddress(registryAddress)
>    .build();
> FlinkKafkaConsumer<ItemTransaction> source = new
> FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);
>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Avro Cloudera Registry (FLINK-14577)

Gyula Fóra-2
Thanks Matyas for starting the discussion!
I think this would be a very valuable addition to Flink as many companies
are already using the Hortonworks/Cloudera registry and it would enable
them to connect to Flink easily.

@Dawid:
Regarding the implementation this a much more lightweight connector than
what we have now for the Confluent registry and the PR you linked. This
wraps the cloudera registry directly, providing a very thin wrapper + some
enhanced functionality regarding handling of Kafka messages keys.

As for the question of main repo outside, I would prefer this to be
included in the main repo, similar to the Confluent registry connector.
Unless we decide to move all of these connectors out I would like to take a
consistent approach.

Cheers,
Gyula


On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz <[hidden email]>
wrote:

> Hi Matyas,
>
> I think this would be a valuable addition. You may reuse some of the
> already available abstractions for writing avro deserialization schema
> based on a schema registry (have a look at RegistryDeserializationSchema
> and SchemaCoderProvider). There is also an opened PR for adding a
> similar serialization schema[1].
>
> The only concern is that I am not 100% sure what is the consensus on
> which connectors do we want to adapt into the main repository and which
> would we prefer to be hosted separately and included in the ecosystem
> webpage[2] (that I hope will be published soon).
>
> Whatever option will be preferred I could help review the code.
>
> Best,
>
> Dawid
>
> [1] https://github.com/apache/flink/pull/8371
>
> [2]
>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
>
> On 05/11/2019 12:40, Őrhidi Mátyás wrote:
> > Dear Flink Community!
> >
> > We have noticed a recent request for Hortonworks schema registry support
> (
> > FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We
> have
> > an implementation for it already, and we would be happy to contribute it
> to
> > Apache Flink.
> >
> > You can find the documentation below[1]. Let us know your thoughts!
> >
> > Best Regards,
> > Matyas
> >
> > [1] Flink Avro Cloudera Registry User Guide
> > -----------------------------------------------------------
> >
> > Add the following dependency to use the schema registry integration:
> > <dependency>
> >     <groupId>org.apache.flink</groupId>
> >     <artifactId>flink-avro-cloudera-registry</artifactId>
> >     <version>${flink.version}</version>
> > </dependency>
> >
> >
> > The schema registry can be plugged directly into the FlinkKafkaConsumer
> and
> > FlinkKafkaProducer using the appropriate schema:
> > -
> >
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
> > -
> >
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema
> >
> >
> > Supported types
> > ----------------------
> > - Avro Specific Record types
> > - Avro Generic Records
> > - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float,
> Long,
> > String, Boolean
> >
> > SchemaRegistrySerializationSchema
> > --------------------------------------------------
> > The serialization schema can be constructed using the included builder
> > object SchemaRegistrySerializationSchema.builder(..).
> >
> > Required settings:
> > - Topic configuration when creating the builder. Can be static or dynamic
> > (extracted from the data)
> > - RegistryAddress parameter on the builder to establish the connection
> >
> > Optional settings:
> > - Arbitrary SchemaRegistry client configuration using the setConfig
> method
> > - Key configuration for the produced Kafka messages
> >  - By specifying a KeySelector function that extracts the key from each
> > record
> >  - Using a Tuple2 stream for (key, value) pairs directly
> > - Security configuration
> >
> > Example:
> > KafkaSerializationSchema<ItemTransaction> schema =
> > SchemaRegistrySerializationSchema
> >     .<ItemTransaction>builder(topic)
> >     .setRegistryAddress(registryAddress)
> >     .setKey(ItemTransaction::getItemId)
> >     .build();
> > FlinkKafkaProducer<ItemTransaction> sink = new
> > FlinkKafkaProducer<>("dummy", schema, kafkaProps,
> > FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> >
> > SchemaRegistryDeserializationSchema
> > -----------------------------------------------------
> > The deserialization schema can be constructed using the included builder
> > object SchemaRegistryDeserializationSchema.builder(..).
> > When reading messages (and keys) we always have to specify the expected
> > Class<T> or record Schema of the input records so that Flink can do any
> > necessary conversion between the data on Kafka and what is expected.
> >
> > Required settings:
> > - Class or Schema of the input messages depending on the data type
> > - RegistryAddress parameter on the builder to establish the connection
> >
> > Optional settings:
> > - Arbitrary SchemaRegistry client configuration using the setConfig
> method
> > - Key configuration for the consumed Kafka messages
> >  - Should only be specified when we want to read the keys as well into a
> > (key, value) stream
> > - Security configuration
> >
> > Example:
> > KafkaDeserializationSchema<ItemTransaction> schema =
> > SchemaRegistryDeserializationSchema
> >    .builder(ItemTransaction.class)
> >    .setRegistryAddress(registryAddress)
> >    .build();
> > FlinkKafkaConsumer<ItemTransaction> source = new
> > FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Avro Cloudera Registry (FLINK-14577)

dwysakowicz
Hi Gyula,

I did not want to discourage this contribution. I do agree we should
treat this connector equally to the confluent's schema registry. I just
wanted to express my uncertainty about general approach to new
connectors contributions. By no means I wanted to discourage this
contribution.

As for the second point. Do you mean that you are wrapping the
KafkaAvroDeserializer/Serializer provided by cloudera/hortonworks schema
registry?

Personally I would very much prefer using the SchemaCoder approach. All
schemas boil down to two steps. (De)Serializing the schema with registry
specific protocol + (de)serializing the record itself. I think the
approach with SchemaCoder has the benefit that we can optimize
instantiation of Avro's readers and writers in a unified way. It's also
easier to maintain as we have just a single point where the actual
record (de)serialization happens. It also provides a unified way of
instantiating the TypeInformation. Could you give some explanation why
would you prefer not to use this approach?

Best,

Dawid

On 05/11/2019 14:48, Gyula Fóra wrote:

> Thanks Matyas for starting the discussion!
> I think this would be a very valuable addition to Flink as many companies
> are already using the Hortonworks/Cloudera registry and it would enable
> them to connect to Flink easily.
>
> @Dawid:
> Regarding the implementation this a much more lightweight connector than
> what we have now for the Confluent registry and the PR you linked. This
> wraps the cloudera registry directly, providing a very thin wrapper + some
> enhanced functionality regarding handling of Kafka messages keys.
>
> As for the question of main repo outside, I would prefer this to be
> included in the main repo, similar to the Confluent registry connector.
> Unless we decide to move all of these connectors out I would like to take a
> consistent approach.
>
> Cheers,
> Gyula
>
>
> On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz <[hidden email]>
> wrote:
>
>> Hi Matyas,
>>
>> I think this would be a valuable addition. You may reuse some of the
>> already available abstractions for writing avro deserialization schema
>> based on a schema registry (have a look at RegistryDeserializationSchema
>> and SchemaCoderProvider). There is also an opened PR for adding a
>> similar serialization schema[1].
>>
>> The only concern is that I am not 100% sure what is the consensus on
>> which connectors do we want to adapt into the main repository and which
>> would we prefer to be hosted separately and included in the ecosystem
>> webpage[2] (that I hope will be published soon).
>>
>> Whatever option will be preferred I could help review the code.
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://github.com/apache/flink/pull/8371
>>
>> [2]
>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
>>
>> On 05/11/2019 12:40, Őrhidi Mátyás wrote:
>>> Dear Flink Community!
>>>
>>> We have noticed a recent request for Hortonworks schema registry support
>> (
>>> FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We
>> have
>>> an implementation for it already, and we would be happy to contribute it
>> to
>>> Apache Flink.
>>>
>>> You can find the documentation below[1]. Let us know your thoughts!
>>>
>>> Best Regards,
>>> Matyas
>>>
>>> [1] Flink Avro Cloudera Registry User Guide
>>> -----------------------------------------------------------
>>>
>>> Add the following dependency to use the schema registry integration:
>>> <dependency>
>>>     <groupId>org.apache.flink</groupId>
>>>     <artifactId>flink-avro-cloudera-registry</artifactId>
>>>     <version>${flink.version}</version>
>>> </dependency>
>>>
>>>
>>> The schema registry can be plugged directly into the FlinkKafkaConsumer
>> and
>>> FlinkKafkaProducer using the appropriate schema:
>>> -
>>>
>> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
>>> -
>>>
>> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema
>>>
>>> Supported types
>>> ----------------------
>>> - Avro Specific Record types
>>> - Avro Generic Records
>>> - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float,
>> Long,
>>> String, Boolean
>>>
>>> SchemaRegistrySerializationSchema
>>> --------------------------------------------------
>>> The serialization schema can be constructed using the included builder
>>> object SchemaRegistrySerializationSchema.builder(..).
>>>
>>> Required settings:
>>> - Topic configuration when creating the builder. Can be static or dynamic
>>> (extracted from the data)
>>> - RegistryAddress parameter on the builder to establish the connection
>>>
>>> Optional settings:
>>> - Arbitrary SchemaRegistry client configuration using the setConfig
>> method
>>> - Key configuration for the produced Kafka messages
>>>  - By specifying a KeySelector function that extracts the key from each
>>> record
>>>  - Using a Tuple2 stream for (key, value) pairs directly
>>> - Security configuration
>>>
>>> Example:
>>> KafkaSerializationSchema<ItemTransaction> schema =
>>> SchemaRegistrySerializationSchema
>>>     .<ItemTransaction>builder(topic)
>>>     .setRegistryAddress(registryAddress)
>>>     .setKey(ItemTransaction::getItemId)
>>>     .build();
>>> FlinkKafkaProducer<ItemTransaction> sink = new
>>> FlinkKafkaProducer<>("dummy", schema, kafkaProps,
>>> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>>>
>>> SchemaRegistryDeserializationSchema
>>> -----------------------------------------------------
>>> The deserialization schema can be constructed using the included builder
>>> object SchemaRegistryDeserializationSchema.builder(..).
>>> When reading messages (and keys) we always have to specify the expected
>>> Class<T> or record Schema of the input records so that Flink can do any
>>> necessary conversion between the data on Kafka and what is expected.
>>>
>>> Required settings:
>>> - Class or Schema of the input messages depending on the data type
>>> - RegistryAddress parameter on the builder to establish the connection
>>>
>>> Optional settings:
>>> - Arbitrary SchemaRegistry client configuration using the setConfig
>> method
>>> - Key configuration for the consumed Kafka messages
>>>  - Should only be specified when we want to read the keys as well into a
>>> (key, value) stream
>>> - Security configuration
>>>
>>> Example:
>>> KafkaDeserializationSchema<ItemTransaction> schema =
>>> SchemaRegistryDeserializationSchema
>>>    .builder(ItemTransaction.class)
>>>    .setRegistryAddress(registryAddress)
>>>    .build();
>>> FlinkKafkaConsumer<ItemTransaction> source = new
>>> FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);
>>>
>>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Avro Cloudera Registry (FLINK-14577)

Gyula Fóra-2
Hi Dawid,

In general I agree if we can provide a completely unified way of handling
this registries that would be great but I wonder if that makes sense in the
long term. While the cloudera schema registry only supports Avro at the
moment, it aims to support other formats in the future, and accessing this
functionality will probably rely on using those specific
serializer/deserializer implementations. This might not be a valid concern
at this point though :)

The reason why we went with wrapping the KafkaAvroDeserializer/Serializer
directly now, is that it was super simple to do and the current SchemaCoder
approach lacks a lot of flexibility/functionality.

The schema itself doesn't always come from the serialized data (I believe
in this case it is either stored in the serialized data or the kafka record
metadata) and also we want to be able to handle kafka message keys. I guess
these could be solved by making the deserialization logic Kafka specific
and exposing the ConsumerRecord but that would completely change the
current schemacoder related interfaces.

Cheers,
Gyula

On Wed, Nov 6, 2019 at 10:17 AM Dawid Wysakowicz <[hidden email]>
wrote:

> Hi Gyula,
>
> I did not want to discourage this contribution. I do agree we should
> treat this connector equally to the confluent's schema registry. I just
> wanted to express my uncertainty about general approach to new
> connectors contributions. By no means I wanted to discourage this
> contribution.
>
> As for the second point. Do you mean that you are wrapping the
> KafkaAvroDeserializer/Serializer provided by cloudera/hortonworks schema
> registry?
>
> Personally I would very much prefer using the SchemaCoder approach. All
> schemas boil down to two steps. (De)Serializing the schema with registry
> specific protocol + (de)serializing the record itself. I think the
> approach with SchemaCoder has the benefit that we can optimize
> instantiation of Avro's readers and writers in a unified way. It's also
> easier to maintain as we have just a single point where the actual
> record (de)serialization happens. It also provides a unified way of
> instantiating the TypeInformation. Could you give some explanation why
> would you prefer not to use this approach?
>
> Best,
>
> Dawid
>
> On 05/11/2019 14:48, Gyula Fóra wrote:
> > Thanks Matyas for starting the discussion!
> > I think this would be a very valuable addition to Flink as many companies
> > are already using the Hortonworks/Cloudera registry and it would enable
> > them to connect to Flink easily.
> >
> > @Dawid:
> > Regarding the implementation this a much more lightweight connector than
> > what we have now for the Confluent registry and the PR you linked. This
> > wraps the cloudera registry directly, providing a very thin wrapper +
> some
> > enhanced functionality regarding handling of Kafka messages keys.
> >
> > As for the question of main repo outside, I would prefer this to be
> > included in the main repo, similar to the Confluent registry connector.
> > Unless we decide to move all of these connectors out I would like to
> take a
> > consistent approach.
> >
> > Cheers,
> > Gyula
> >
> >
> > On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz <[hidden email]>
> > wrote:
> >
> >> Hi Matyas,
> >>
> >> I think this would be a valuable addition. You may reuse some of the
> >> already available abstractions for writing avro deserialization schema
> >> based on a schema registry (have a look at RegistryDeserializationSchema
> >> and SchemaCoderProvider). There is also an opened PR for adding a
> >> similar serialization schema[1].
> >>
> >> The only concern is that I am not 100% sure what is the consensus on
> >> which connectors do we want to adapt into the main repository and which
> >> would we prefer to be hosted separately and included in the ecosystem
> >> webpage[2] (that I hope will be published soon).
> >>
> >> Whatever option will be preferred I could help review the code.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> [1] https://github.com/apache/flink/pull/8371
> >>
> >> [2]
> >>
> >>
> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
> >>
> >> On 05/11/2019 12:40, Őrhidi Mátyás wrote:
> >>> Dear Flink Community!
> >>>
> >>> We have noticed a recent request for Hortonworks schema registry
> support
> >> (
> >>> FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We
> >> have
> >>> an implementation for it already, and we would be happy to contribute
> it
> >> to
> >>> Apache Flink.
> >>>
> >>> You can find the documentation below[1]. Let us know your thoughts!
> >>>
> >>> Best Regards,
> >>> Matyas
> >>>
> >>> [1] Flink Avro Cloudera Registry User Guide
> >>> -----------------------------------------------------------
> >>>
> >>> Add the following dependency to use the schema registry integration:
> >>> <dependency>
> >>>     <groupId>org.apache.flink</groupId>
> >>>     <artifactId>flink-avro-cloudera-registry</artifactId>
> >>>     <version>${flink.version}</version>
> >>> </dependency>
> >>>
> >>>
> >>> The schema registry can be plugged directly into the FlinkKafkaConsumer
> >> and
> >>> FlinkKafkaProducer using the appropriate schema:
> >>> -
> >>>
> >>
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
> >>> -
> >>>
> >>
> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema
> >>>
> >>> Supported types
> >>> ----------------------
> >>> - Avro Specific Record types
> >>> - Avro Generic Records
> >>> - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float,
> >> Long,
> >>> String, Boolean
> >>>
> >>> SchemaRegistrySerializationSchema
> >>> --------------------------------------------------
> >>> The serialization schema can be constructed using the included builder
> >>> object SchemaRegistrySerializationSchema.builder(..).
> >>>
> >>> Required settings:
> >>> - Topic configuration when creating the builder. Can be static or
> dynamic
> >>> (extracted from the data)
> >>> - RegistryAddress parameter on the builder to establish the connection
> >>>
> >>> Optional settings:
> >>> - Arbitrary SchemaRegistry client configuration using the setConfig
> >> method
> >>> - Key configuration for the produced Kafka messages
> >>>  - By specifying a KeySelector function that extracts the key from each
> >>> record
> >>>  - Using a Tuple2 stream for (key, value) pairs directly
> >>> - Security configuration
> >>>
> >>> Example:
> >>> KafkaSerializationSchema<ItemTransaction> schema =
> >>> SchemaRegistrySerializationSchema
> >>>     .<ItemTransaction>builder(topic)
> >>>     .setRegistryAddress(registryAddress)
> >>>     .setKey(ItemTransaction::getItemId)
> >>>     .build();
> >>> FlinkKafkaProducer<ItemTransaction> sink = new
> >>> FlinkKafkaProducer<>("dummy", schema, kafkaProps,
> >>> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
> >>>
> >>> SchemaRegistryDeserializationSchema
> >>> -----------------------------------------------------
> >>> The deserialization schema can be constructed using the included
> builder
> >>> object SchemaRegistryDeserializationSchema.builder(..).
> >>> When reading messages (and keys) we always have to specify the expected
> >>> Class<T> or record Schema of the input records so that Flink can do any
> >>> necessary conversion between the data on Kafka and what is expected.
> >>>
> >>> Required settings:
> >>> - Class or Schema of the input messages depending on the data type
> >>> - RegistryAddress parameter on the builder to establish the connection
> >>>
> >>> Optional settings:
> >>> - Arbitrary SchemaRegistry client configuration using the setConfig
> >> method
> >>> - Key configuration for the consumed Kafka messages
> >>>  - Should only be specified when we want to read the keys as well into
> a
> >>> (key, value) stream
> >>> - Security configuration
> >>>
> >>> Example:
> >>> KafkaDeserializationSchema<ItemTransaction> schema =
> >>> SchemaRegistryDeserializationSchema
> >>>    .builder(ItemTransaction.class)
> >>>    .setRegistryAddress(registryAddress)
> >>>    .build();
> >>> FlinkKafkaConsumer<ItemTransaction> source = new
> >>> FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Avro Cloudera Registry (FLINK-14577)

dwysakowicz

Hi Gyula,

First of all sorry for the delayed response.

I see the argument for handling metadata from kafka headers. I haven't noticed the schema you are proposing is actually KafkaDeserializationSchema, which means it works only with Kafka.

I still believe it would be really beneficial for the community to have a more general registry schema, but if we want to support the schema being encoded in the records metadata it would require a rework of the hierarchy of the (Connector)DeserializationSchemas. Which I guess should be discussed separately.

Having said that I tend to agree with you it would make sense to add the thin wrapper as an initial version. Especially as you are suggesting to hide the implementation details behind a builder. Some comments on the design:

* I would make it more explicit in the entry point this works with the Cloudera(Hortonworks) schema registry (Maybe sth like ClouderaRegistryDeserializationSchema.builder())

* I would make it somehow more explicit that it constructs only Kafka(De)serializationSchema.

* We should consider the dependencies design. This schema in contrast to the Confluent's, would pull in kafka consumer dependencies. If we add a schema that could deserialize data from other systems, we should not pull the kafka dependencies automatically.

Best,

Dawid

On 06/11/2019 11:32, Gyula Fóra wrote:
Hi Dawid,

In general I agree if we can provide a completely unified way of handling this registries that would be great but I wonder if that makes sense in the long term. While the cloudera schema registry only supports Avro at the moment, it aims to support other formats in the future, and accessing this functionality will probably rely on using those specific serializer/deserializer implementations. This might not be a valid concern at this point though :)

The reason why we went with wrapping the KafkaAvroDeserializer/Serializer directly now, is that it was super simple to do and the current SchemaCoder approach lacks a lot of flexibility/functionality.

The schema itself doesn't always come from the serialized data (I believe in this case it is either stored in the serialized data or the kafka record metadata) and also we want to be able to handle kafka message keys. I guess these could be solved by making the deserialization logic Kafka specific and exposing the ConsumerRecord but that would completely change the current schemacoder related interfaces.

Cheers,
Gyula

On Wed, Nov 6, 2019 at 10:17 AM Dawid Wysakowicz <[hidden email]> wrote:
Hi Gyula,

I did not want to discourage this contribution. I do agree we should
treat this connector equally to the confluent's schema registry. I just
wanted to express my uncertainty about general approach to new
connectors contributions. By no means I wanted to discourage this
contribution.

As for the second point. Do you mean that you are wrapping the
KafkaAvroDeserializer/Serializer provided by cloudera/hortonworks schema
registry?

Personally I would very much prefer using the SchemaCoder approach. All
schemas boil down to two steps. (De)Serializing the schema with registry
specific protocol + (de)serializing the record itself. I think the
approach with SchemaCoder has the benefit that we can optimize
instantiation of Avro's readers and writers in a unified way. It's also
easier to maintain as we have just a single point where the actual
record (de)serialization happens. It also provides a unified way of
instantiating the TypeInformation. Could you give some explanation why
would you prefer not to use this approach?

Best,

Dawid

On 05/11/2019 14:48, Gyula Fóra wrote:
> Thanks Matyas for starting the discussion!
> I think this would be a very valuable addition to Flink as many companies
> are already using the Hortonworks/Cloudera registry and it would enable
> them to connect to Flink easily.
>
> @Dawid:
> Regarding the implementation this a much more lightweight connector than
> what we have now for the Confluent registry and the PR you linked. This
> wraps the cloudera registry directly, providing a very thin wrapper + some
> enhanced functionality regarding handling of Kafka messages keys.
>
> As for the question of main repo outside, I would prefer this to be
> included in the main repo, similar to the Confluent registry connector.
> Unless we decide to move all of these connectors out I would like to take a
> consistent approach.
>
> Cheers,
> Gyula
>
>
> On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz <[hidden email]>
> wrote:
>
>> Hi Matyas,
>>
>> I think this would be a valuable addition. You may reuse some of the
>> already available abstractions for writing avro deserialization schema
>> based on a schema registry (have a look at RegistryDeserializationSchema
>> and SchemaCoderProvider). There is also an opened PR for adding a
>> similar serialization schema[1].
>>
>> The only concern is that I am not 100% sure what is the consensus on
>> which connectors do we want to adapt into the main repository and which
>> would we prefer to be hosted separately and included in the ecosystem
>> webpage[2] (that I hope will be published soon).
>>
>> Whatever option will be preferred I could help review the code.
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://github.com/apache/flink/pull/8371
>>
>> [2]
>>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
>>
>> On 05/11/2019 12:40, Őrhidi Mátyás wrote:
>>> Dear Flink Community!
>>>
>>> We have noticed a recent request for Hortonworks schema registry support
>> (
>>> FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We
>> have
>>> an implementation for it already, and we would be happy to contribute it
>> to
>>> Apache Flink.
>>>
>>> You can find the documentation below[1]. Let us know your thoughts!
>>>
>>> Best Regards,
>>> Matyas
>>>
>>> [1] Flink Avro Cloudera Registry User Guide
>>> -----------------------------------------------------------
>>>
>>> Add the following dependency to use the schema registry integration:
>>> <dependency>
>>>     <groupId>org.apache.flink</groupId>
>>>     <artifactId>flink-avro-cloudera-registry</artifactId>
>>>     <version>${flink.version}</version>
>>> </dependency>
>>>
>>>
>>> The schema registry can be plugged directly into the FlinkKafkaConsumer
>> and
>>> FlinkKafkaProducer using the appropriate schema:
>>> -
>>>
>> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
>>> -
>>>
>> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema
>>>
>>> Supported types
>>> ----------------------
>>> - Avro Specific Record types
>>> - Avro Generic Records
>>> - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float,
>> Long,
>>> String, Boolean
>>>
>>> SchemaRegistrySerializationSchema
>>> --------------------------------------------------
>>> The serialization schema can be constructed using the included builder
>>> object SchemaRegistrySerializationSchema.builder(..).
>>>
>>> Required settings:
>>> - Topic configuration when creating the builder. Can be static or dynamic
>>> (extracted from the data)
>>> - RegistryAddress parameter on the builder to establish the connection
>>>
>>> Optional settings:
>>> - Arbitrary SchemaRegistry client configuration using the setConfig
>> method
>>> - Key configuration for the produced Kafka messages
>>>  - By specifying a KeySelector function that extracts the key from each
>>> record
>>>  - Using a Tuple2 stream for (key, value) pairs directly
>>> - Security configuration
>>>
>>> Example:
>>> KafkaSerializationSchema<ItemTransaction> schema =
>>> SchemaRegistrySerializationSchema
>>>     .<ItemTransaction>builder(topic)
>>>     .setRegistryAddress(registryAddress)
>>>     .setKey(ItemTransaction::getItemId)
>>>     .build();
>>> FlinkKafkaProducer<ItemTransaction> sink = new
>>> FlinkKafkaProducer<>("dummy", schema, kafkaProps,
>>> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>>>
>>> SchemaRegistryDeserializationSchema
>>> -----------------------------------------------------
>>> The deserialization schema can be constructed using the included builder
>>> object SchemaRegistryDeserializationSchema.builder(..).
>>> When reading messages (and keys) we always have to specify the expected
>>> Class<T> or record Schema of the input records so that Flink can do any
>>> necessary conversion between the data on Kafka and what is expected.
>>>
>>> Required settings:
>>> - Class or Schema of the input messages depending on the data type
>>> - RegistryAddress parameter on the builder to establish the connection
>>>
>>> Optional settings:
>>> - Arbitrary SchemaRegistry client configuration using the setConfig
>> method
>>> - Key configuration for the consumed Kafka messages
>>>  - Should only be specified when we want to read the keys as well into a
>>> (key, value) stream
>>> - Security configuration
>>>
>>> Example:
>>> KafkaDeserializationSchema<ItemTransaction> schema =
>>> SchemaRegistryDeserializationSchema
>>>    .builder(ItemTransaction.class)
>>>    .setRegistryAddress(registryAddress)
>>>    .build();
>>> FlinkKafkaConsumer<ItemTransaction> source = new
>>> FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);
>>>
>>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Flink Avro Cloudera Registry (FLINK-14577)

Őrhidi Mátyás
Thanks Dawid, Gyula

we will create a pull request then with the proposed changes were we can
further elaborate on the dependencies.

Regards,
Matyas

On Thu, Nov 14, 2019 at 11:10 AM Dawid Wysakowicz <[hidden email]>
wrote:

> Hi Gyula,
>
> First of all sorry for the delayed response.
>
> I see the argument for handling metadata from kafka headers. I haven't
> noticed the schema you are proposing is actually
> KafkaDeserializationSchema, which means it works only with Kafka.
>
> I still believe it would be really beneficial for the community to have a
> more general registry schema, but if we want to support the schema being
> encoded in the records metadata it would require a rework of the hierarchy
> of the (Connector)DeserializationSchemas. Which I guess should be discussed
> separately.
>
> Having said that I tend to agree with you it would make sense to add the
> thin wrapper as an initial version. Especially as you are suggesting to
> hide the implementation details behind a builder. Some comments on the
> design:
>
> * I would make it more explicit in the entry point this works with the
> Cloudera(Hortonworks) schema registry (Maybe sth like
> ClouderaRegistryDeserializationSchema.builder())
>
> * I would make it somehow more explicit that it constructs only *Kafka*
> (De)serializationSchema.
>
> * We should consider the dependencies design. This schema in contrast to
> the Confluent's, would pull in kafka consumer dependencies. If we add a
> schema that could deserialize data from other systems, we should not pull
> the kafka dependencies automatically.
>
> Best,
>
> Dawid
> On 06/11/2019 11:32, Gyula Fóra wrote:
>
> Hi Dawid,
>
> In general I agree if we can provide a completely unified way of handling
> this registries that would be great but I wonder if that makes sense in the
> long term. While the cloudera schema registry only supports Avro at the
> moment, it aims to support other formats in the future, and accessing this
> functionality will probably rely on using those specific
> serializer/deserializer implementations. This might not be a valid concern
> at this point though :)
>
> The reason why we went with wrapping the KafkaAvroDeserializer/Serializer
> directly now, is that it was super simple to do and the current SchemaCoder
> approach lacks a lot of flexibility/functionality.
>
> The schema itself doesn't always come from the serialized data (I believe
> in this case it is either stored in the serialized data or the kafka record
> metadata) and also we want to be able to handle kafka message keys. I guess
> these could be solved by making the deserialization logic Kafka specific
> and exposing the ConsumerRecord but that would completely change the
> current schemacoder related interfaces.
>
> Cheers,
> Gyula
>
> On Wed, Nov 6, 2019 at 10:17 AM Dawid Wysakowicz <[hidden email]>
> wrote:
>
>> Hi Gyula,
>>
>> I did not want to discourage this contribution. I do agree we should
>> treat this connector equally to the confluent's schema registry. I just
>> wanted to express my uncertainty about general approach to new
>> connectors contributions. By no means I wanted to discourage this
>> contribution.
>>
>> As for the second point. Do you mean that you are wrapping the
>> KafkaAvroDeserializer/Serializer provided by cloudera/hortonworks schema
>> registry?
>>
>> Personally I would very much prefer using the SchemaCoder approach. All
>> schemas boil down to two steps. (De)Serializing the schema with registry
>> specific protocol + (de)serializing the record itself. I think the
>> approach with SchemaCoder has the benefit that we can optimize
>> instantiation of Avro's readers and writers in a unified way. It's also
>> easier to maintain as we have just a single point where the actual
>> record (de)serialization happens. It also provides a unified way of
>> instantiating the TypeInformation. Could you give some explanation why
>> would you prefer not to use this approach?
>>
>> Best,
>>
>> Dawid
>>
>> On 05/11/2019 14:48, Gyula Fóra wrote:
>> > Thanks Matyas for starting the discussion!
>> > I think this would be a very valuable addition to Flink as many
>> companies
>> > are already using the Hortonworks/Cloudera registry and it would enable
>> > them to connect to Flink easily.
>> >
>> > @Dawid:
>> > Regarding the implementation this a much more lightweight connector than
>> > what we have now for the Confluent registry and the PR you linked. This
>> > wraps the cloudera registry directly, providing a very thin wrapper +
>> some
>> > enhanced functionality regarding handling of Kafka messages keys.
>> >
>> > As for the question of main repo outside, I would prefer this to be
>> > included in the main repo, similar to the Confluent registry connector.
>> > Unless we decide to move all of these connectors out I would like to
>> take a
>> > consistent approach.
>> >
>> > Cheers,
>> > Gyula
>> >
>> >
>> > On Tue, Nov 5, 2019 at 1:44 PM Dawid Wysakowicz <[hidden email]
>> >
>> > wrote:
>> >
>> >> Hi Matyas,
>> >>
>> >> I think this would be a valuable addition. You may reuse some of the
>> >> already available abstractions for writing avro deserialization schema
>> >> based on a schema registry (have a look at
>> RegistryDeserializationSchema
>> >> and SchemaCoderProvider). There is also an opened PR for adding a
>> >> similar serialization schema[1].
>> >>
>> >> The only concern is that I am not 100% sure what is the consensus on
>> >> which connectors do we want to adapt into the main repository and which
>> >> would we prefer to be hosted separately and included in the ecosystem
>> >> webpage[2] (that I hope will be published soon).
>> >>
>> >> Whatever option will be preferred I could help review the code.
>> >>
>> >> Best,
>> >>
>> >> Dawid
>> >>
>> >> [1] https://github.com/apache/flink/pull/8371
>> >>
>> >> [2]
>> >>
>> >>
>> http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Create-a-Flink-ecosystem-website-td27519.html
>> >>
>> >> On 05/11/2019 12:40, Őrhidi Mátyás wrote:
>> >>> Dear Flink Community!
>> >>>
>> >>> We have noticed a recent request for Hortonworks schema registry
>> support
>> >> (
>> >>> FLINK-14577 <https://issues.apache.org/jira/browse/FLINK-14577>). We
>> >> have
>> >>> an implementation for it already, and we would be happy to contribute
>> it
>> >> to
>> >>> Apache Flink.
>> >>>
>> >>> You can find the documentation below[1]. Let us know your thoughts!
>> >>>
>> >>> Best Regards,
>> >>> Matyas
>> >>>
>> >>> [1] Flink Avro Cloudera Registry User Guide
>> >>> -----------------------------------------------------------
>> >>>
>> >>> Add the following dependency to use the schema registry integration:
>> >>> <dependency>
>> >>>     <groupId>org.apache.flink</groupId>
>> >>>     <artifactId>flink-avro-cloudera-registry</artifactId>
>> >>>     <version>${flink.version}</version>
>> >>> </dependency>
>> >>>
>> >>>
>> >>> The schema registry can be plugged directly into the
>> FlinkKafkaConsumer
>> >> and
>> >>> FlinkKafkaProducer using the appropriate schema:
>> >>> -
>> >>>
>> >>
>> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistryDeserializationSchema
>> >>> -
>> >>>
>> >>
>> org.apache.flink.formats.avro.registry.cloudera.SchemaRegistrySerializationSchema
>> >>>
>> >>> Supported types
>> >>> ----------------------
>> >>> - Avro Specific Record types
>> >>> - Avro Generic Records
>> >>> - Basic Java Data types: byte[], Byte, Integer, Short, Double, Float,
>> >> Long,
>> >>> String, Boolean
>> >>>
>> >>> SchemaRegistrySerializationSchema
>> >>> --------------------------------------------------
>> >>> The serialization schema can be constructed using the included builder
>> >>> object SchemaRegistrySerializationSchema.builder(..).
>> >>>
>> >>> Required settings:
>> >>> - Topic configuration when creating the builder. Can be static or
>> dynamic
>> >>> (extracted from the data)
>> >>> - RegistryAddress parameter on the builder to establish the connection
>> >>>
>> >>> Optional settings:
>> >>> - Arbitrary SchemaRegistry client configuration using the setConfig
>> >> method
>> >>> - Key configuration for the produced Kafka messages
>> >>>  - By specifying a KeySelector function that extracts the key from
>> each
>> >>> record
>> >>>  - Using a Tuple2 stream for (key, value) pairs directly
>> >>> - Security configuration
>> >>>
>> >>> Example:
>> >>> KafkaSerializationSchema<ItemTransaction> schema =
>> >>> SchemaRegistrySerializationSchema
>> >>>     .<ItemTransaction>builder(topic)
>> >>>     .setRegistryAddress(registryAddress)
>> >>>     .setKey(ItemTransaction::getItemId)
>> >>>     .build();
>> >>> FlinkKafkaProducer<ItemTransaction> sink = new
>> >>> FlinkKafkaProducer<>("dummy", schema, kafkaProps,
>> >>> FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);
>> >>>
>> >>> SchemaRegistryDeserializationSchema
>> >>> -----------------------------------------------------
>> >>> The deserialization schema can be constructed using the included
>> builder
>> >>> object SchemaRegistryDeserializationSchema.builder(..).
>> >>> When reading messages (and keys) we always have to specify the
>> expected
>> >>> Class<T> or record Schema of the input records so that Flink can do
>> any
>> >>> necessary conversion between the data on Kafka and what is expected.
>> >>>
>> >>> Required settings:
>> >>> - Class or Schema of the input messages depending on the data type
>> >>> - RegistryAddress parameter on the builder to establish the connection
>> >>>
>> >>> Optional settings:
>> >>> - Arbitrary SchemaRegistry client configuration using the setConfig
>> >> method
>> >>> - Key configuration for the consumed Kafka messages
>> >>>  - Should only be specified when we want to read the keys as well
>> into a
>> >>> (key, value) stream
>> >>> - Security configuration
>> >>>
>> >>> Example:
>> >>> KafkaDeserializationSchema<ItemTransaction> schema =
>> >>> SchemaRegistryDeserializationSchema
>> >>>    .builder(ItemTransaction.class)
>> >>>    .setRegistryAddress(registryAddress)
>> >>>    .build();
>> >>> FlinkKafkaConsumer<ItemTransaction> source = new
>> >>> FlinkKafkaConsumer<>(inputTopic, schema, kafkaProps, groupdId);
>> >>>
>> >>
>>
>>