Avro Schema Resolution Compatibility

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Avro Schema Resolution Compatibility

Dominik Wosiński
Hey,
I have a question regarding Avro Types and schema evolution. According to
the docs the schema resolution is compatible with the Avro docs [1].

But I have done some testing. For example, I have created a record, written
it to Kafka, and then changed the order the fields in schema and tried to
read the data with changed schema using AvroDeserializationSchema from
Flink, it was  failing with IndexOfBounds during deserialization, but
according to the docs fields are resolved by name during deserialization
and changed ordering should not really be a problem.

I have also found some other inconsistencies like  when the writer is
missing the field, but it is optional for reader, which means that it has
default value. According to Avro docs this should be resolved, but I only
managed to resolve this of the missing field is at the end, not if it's in
the middle.

Is there anything that I don't understand here, or is something wrong ??

Best Regards,
Dom.

[1]http://avro.apache.org/docs/current/spec.html#Schema+Resolution
Reply | Threaded
Open this post in threaded view
|

Re: Avro Schema Resolution Compatibility

dwysakowicz
Hi Dominik,

I am not sure which documentation do you refer to when saying: "According to
the docs the schema resolution is compatible with the Avro docs", but I
assume this one[1]. If this is the case then the
AvroDeserializationSchema plays no role in this process. That page
describes evolution of the schema of the Flink's state. What that means
is if you use an Avro class for objects in your state, take savepoint
and then restore your job with updated schema of those avro objects,
Flink should migrate the state to the new schema.
AvroDeserializationSchema is used when reading records from external
systems e.g. from Kafka.

General remark on Avro's schema migration (not Flink's per se). The avro
reader needs both the schema with which the record was written (writer's
schema) and the current schema (reader's schema) to perform the migration.

In case of AvroDeserializationSchema both are always equal, therefore
you cannot read records written with different schemas (we are missing
the writer's schema). If you want to support reading records written
with different schemas you need to use RegistryDeserializationSchema.
That DeserializationSchema can provide the writer's schema for every
record. Flink, out of the box, provides an implementation of
RegistryDeserializationSchema that integrates with Confluent's schema
registry for providing the writers schema, but you are free to provide
your own implementation.

Coming back to the state migration. The way it works is that Flink
writes the avro schema as part of the snapshot. Therefore it is possible
to migrate the whole state to the changed schema upon restoring. The new
snapshot will be written entirely with the new updated avro schema.

Hope this clarifies how the integration with Avro in Flink works.

Best,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html#avro-types

On 04/11/2019 10:58, Dominik Wosiński wrote:

> Hey,
> I have a question regarding Avro Types and schema evolution. According to
> the docs the schema resolution is compatible with the Avro docs [1].
>
> But I have done some testing. For example, I have created a record, written
> it to Kafka, and then changed the order the fields in schema and tried to
> read the data with changed schema using AvroDeserializationSchema from
> Flink, it was  failing with IndexOfBounds during deserialization, but
> according to the docs fields are resolved by name during deserialization
> and changed ordering should not really be a problem.
>
> I have also found some other inconsistencies like  when the writer is
> missing the field, but it is optional for reader, which means that it has
> default value. According to Avro docs this should be resolved, but I only
> managed to resolve this of the missing field is at the end, not if it's in
> the middle.
>
> Is there anything that I don't understand here, or is something wrong ??
>
> Best Regards,
> Dom.
>
> [1]http://avro.apache.org/docs/current/spec.html#Schema+Resolution
>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Avro Schema Resolution Compatibility

Dominik Wosiński
Hey Dawid,
Thanks a lot. I have indeed missed the part that this is actually about
State not the Deserialization itself.
This seems to be clear and consistent now.

Thanks again,
Best Regards,
Dom.


pon., 4 lis 2019 o 13:18 Dawid Wysakowicz <[hidden email]>
napisał(a):

> Hi Dominik,
>
> I am not sure which documentation do you refer to when saying: "According
> to
> the docs the schema resolution is compatible with the Avro docs", but I
> assume this one[1]. If this is the case then the
> AvroDeserializationSchema plays no role in this process. That page
> describes evolution of the schema of the Flink's state. What that means
> is if you use an Avro class for objects in your state, take savepoint
> and then restore your job with updated schema of those avro objects,
> Flink should migrate the state to the new schema.
> AvroDeserializationSchema is used when reading records from external
> systems e.g. from Kafka.
>
> General remark on Avro's schema migration (not Flink's per se). The avro
> reader needs both the schema with which the record was written (writer's
> schema) and the current schema (reader's schema) to perform the migration.
>
> In case of AvroDeserializationSchema both are always equal, therefore
> you cannot read records written with different schemas (we are missing
> the writer's schema). If you want to support reading records written
> with different schemas you need to use RegistryDeserializationSchema.
> That DeserializationSchema can provide the writer's schema for every
> record. Flink, out of the box, provides an implementation of
> RegistryDeserializationSchema that integrates with Confluent's schema
> registry for providing the writers schema, but you are free to provide
> your own implementation.
>
> Coming back to the state migration. The way it works is that Flink
> writes the avro schema as part of the snapshot. Therefore it is possible
> to migrate the whole state to the changed schema upon restoring. The new
> snapshot will be written entirely with the new updated avro schema.
>
> Hope this clarifies how the integration with Avro in Flink works.
>
> Best,
>
> Dawid
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html#avro-types
>
> On 04/11/2019 10:58, Dominik Wosiński wrote:
> > Hey,
> > I have a question regarding Avro Types and schema evolution. According to
> > the docs the schema resolution is compatible with the Avro docs [1].
> >
> > But I have done some testing. For example, I have created a record,
> written
> > it to Kafka, and then changed the order the fields in schema and tried to
> > read the data with changed schema using AvroDeserializationSchema from
> > Flink, it was  failing with IndexOfBounds during deserialization, but
> > according to the docs fields are resolved by name during deserialization
> > and changed ordering should not really be a problem.
> >
> > I have also found some other inconsistencies like  when the writer is
> > missing the field, but it is optional for reader, which means that it has
> > default value. According to Avro docs this should be resolved, but I only
> > managed to resolve this of the missing field is at the end, not if it's
> in
> > the middle.
> >
> > Is there anything that I don't understand here, or is something wrong ??
> >
> > Best Regards,
> > Dom.
> >
> > [1]http://avro.apache.org/docs/current/spec.html#Schema+Resolution
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Avro Schema Resolution Compatibility

Arvid Heise-3
Hi Dominik,

just to add to Dawids explanation: to have a proper schema evolution on
Avro data, it needs to know the schema with which it was written. For state
that means that we are storing the used schema once for all state records
in the state file, since they all belong to the same schema version in the
same checkpoint (it's actually a bit more complicated, but I'll omit that
for now).

For data coming from Kafka or flowing into Kafka, there may be a very large
variety of different schemas with which data is written. Although I
wouldn't recommend it at all, it would even be possible to store records
with incompatible schemas. Hence, it is absolutely necessary to store the
schema on record level to properly support schema evolution. Since the
schema often times is bigger than the actual record, it is common practice
to just store a pointer to the actual schema in the data, which is a 5 byte
header in the case of Confluent schema registry serialization schema (1
marker byte, 4 bytes id). That Confluent schema registry also ensures
schema compatibility if configured correctly.

While in theory, we would support over techniques to refer to the schema,
there is currently no clear alternative to Confluent schema registry yet.

Best,

Arvid

On Mon, Nov 4, 2019 at 5:38 PM Dominik Wosiński <[hidden email]> wrote:

> Hey Dawid,
> Thanks a lot. I have indeed missed the part that this is actually about
> State not the Deserialization itself.
> This seems to be clear and consistent now.
>
> Thanks again,
> Best Regards,
> Dom.
>
>
> pon., 4 lis 2019 o 13:18 Dawid Wysakowicz <[hidden email]>
> napisał(a):
>
> > Hi Dominik,
> >
> > I am not sure which documentation do you refer to when saying: "According
> > to
> > the docs the schema resolution is compatible with the Avro docs", but I
> > assume this one[1]. If this is the case then the
> > AvroDeserializationSchema plays no role in this process. That page
> > describes evolution of the schema of the Flink's state. What that means
> > is if you use an Avro class for objects in your state, take savepoint
> > and then restore your job with updated schema of those avro objects,
> > Flink should migrate the state to the new schema.
> > AvroDeserializationSchema is used when reading records from external
> > systems e.g. from Kafka.
> >
> > General remark on Avro's schema migration (not Flink's per se). The avro
> > reader needs both the schema with which the record was written (writer's
> > schema) and the current schema (reader's schema) to perform the
> migration.
> >
> > In case of AvroDeserializationSchema both are always equal, therefore
> > you cannot read records written with different schemas (we are missing
> > the writer's schema). If you want to support reading records written
> > with different schemas you need to use RegistryDeserializationSchema.
> > That DeserializationSchema can provide the writer's schema for every
> > record. Flink, out of the box, provides an implementation of
> > RegistryDeserializationSchema that integrates with Confluent's schema
> > registry for providing the writers schema, but you are free to provide
> > your own implementation.
> >
> > Coming back to the state migration. The way it works is that Flink
> > writes the avro schema as part of the snapshot. Therefore it is possible
> > to migrate the whole state to the changed schema upon restoring. The new
> > snapshot will be written entirely with the new updated avro schema.
> >
> > Hope this clarifies how the integration with Avro in Flink works.
> >
> > Best,
> >
> > Dawid
> >
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html#avro-types
> >
> > On 04/11/2019 10:58, Dominik Wosiński wrote:
> > > Hey,
> > > I have a question regarding Avro Types and schema evolution. According
> to
> > > the docs the schema resolution is compatible with the Avro docs [1].
> > >
> > > But I have done some testing. For example, I have created a record,
> > written
> > > it to Kafka, and then changed the order the fields in schema and tried
> to
> > > read the data with changed schema using AvroDeserializationSchema from
> > > Flink, it was  failing with IndexOfBounds during deserialization, but
> > > according to the docs fields are resolved by name during
> deserialization
> > > and changed ordering should not really be a problem.
> > >
> > > I have also found some other inconsistencies like  when the writer is
> > > missing the field, but it is optional for reader, which means that it
> has
> > > default value. According to Avro docs this should be resolved, but I
> only
> > > managed to resolve this of the missing field is at the end, not if it's
> > in
> > > the middle.
> > >
> > > Is there anything that I don't understand here, or is something wrong
> ??
> > >
> > > Best Regards,
> > > Dom.
> > >
> > > [1]http://avro.apache.org/docs/current/spec.html#Schema+Resolution
> > >
> >
> >
>