Hey,
I have a question regarding Avro Types and schema evolution. According to the docs the schema resolution is compatible with the Avro docs [1]. But I have done some testing. For example, I have created a record, written it to Kafka, and then changed the order the fields in schema and tried to read the data with changed schema using AvroDeserializationSchema from Flink, it was failing with IndexOfBounds during deserialization, but according to the docs fields are resolved by name during deserialization and changed ordering should not really be a problem. I have also found some other inconsistencies like when the writer is missing the field, but it is optional for reader, which means that it has default value. According to Avro docs this should be resolved, but I only managed to resolve this of the missing field is at the end, not if it's in the middle. Is there anything that I don't understand here, or is something wrong ?? Best Regards, Dom. [1]http://avro.apache.org/docs/current/spec.html#Schema+Resolution |
Hi Dominik,
I am not sure which documentation do you refer to when saying: "According to the docs the schema resolution is compatible with the Avro docs", but I assume this one[1]. If this is the case then the AvroDeserializationSchema plays no role in this process. That page describes evolution of the schema of the Flink's state. What that means is if you use an Avro class for objects in your state, take savepoint and then restore your job with updated schema of those avro objects, Flink should migrate the state to the new schema. AvroDeserializationSchema is used when reading records from external systems e.g. from Kafka. General remark on Avro's schema migration (not Flink's per se). The avro reader needs both the schema with which the record was written (writer's schema) and the current schema (reader's schema) to perform the migration. In case of AvroDeserializationSchema both are always equal, therefore you cannot read records written with different schemas (we are missing the writer's schema). If you want to support reading records written with different schemas you need to use RegistryDeserializationSchema. That DeserializationSchema can provide the writer's schema for every record. Flink, out of the box, provides an implementation of RegistryDeserializationSchema that integrates with Confluent's schema registry for providing the writers schema, but you are free to provide your own implementation. Coming back to the state migration. The way it works is that Flink writes the avro schema as part of the snapshot. Therefore it is possible to migrate the whole state to the changed schema upon restoring. The new snapshot will be written entirely with the new updated avro schema. Hope this clarifies how the integration with Avro in Flink works. Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html#avro-types On 04/11/2019 10:58, Dominik Wosiński wrote: > Hey, > I have a question regarding Avro Types and schema evolution. According to > the docs the schema resolution is compatible with the Avro docs [1]. > > But I have done some testing. For example, I have created a record, written > it to Kafka, and then changed the order the fields in schema and tried to > read the data with changed schema using AvroDeserializationSchema from > Flink, it was failing with IndexOfBounds during deserialization, but > according to the docs fields are resolved by name during deserialization > and changed ordering should not really be a problem. > > I have also found some other inconsistencies like when the writer is > missing the field, but it is optional for reader, which means that it has > default value. According to Avro docs this should be resolved, but I only > managed to resolve this of the missing field is at the end, not if it's in > the middle. > > Is there anything that I don't understand here, or is something wrong ?? > > Best Regards, > Dom. > > [1]http://avro.apache.org/docs/current/spec.html#Schema+Resolution > signature.asc (849 bytes) Download Attachment |
Hey Dawid,
Thanks a lot. I have indeed missed the part that this is actually about State not the Deserialization itself. This seems to be clear and consistent now. Thanks again, Best Regards, Dom. pon., 4 lis 2019 o 13:18 Dawid Wysakowicz <[hidden email]> napisał(a): > Hi Dominik, > > I am not sure which documentation do you refer to when saying: "According > to > the docs the schema resolution is compatible with the Avro docs", but I > assume this one[1]. If this is the case then the > AvroDeserializationSchema plays no role in this process. That page > describes evolution of the schema of the Flink's state. What that means > is if you use an Avro class for objects in your state, take savepoint > and then restore your job with updated schema of those avro objects, > Flink should migrate the state to the new schema. > AvroDeserializationSchema is used when reading records from external > systems e.g. from Kafka. > > General remark on Avro's schema migration (not Flink's per se). The avro > reader needs both the schema with which the record was written (writer's > schema) and the current schema (reader's schema) to perform the migration. > > In case of AvroDeserializationSchema both are always equal, therefore > you cannot read records written with different schemas (we are missing > the writer's schema). If you want to support reading records written > with different schemas you need to use RegistryDeserializationSchema. > That DeserializationSchema can provide the writer's schema for every > record. Flink, out of the box, provides an implementation of > RegistryDeserializationSchema that integrates with Confluent's schema > registry for providing the writers schema, but you are free to provide > your own implementation. > > Coming back to the state migration. The way it works is that Flink > writes the avro schema as part of the snapshot. Therefore it is possible > to migrate the whole state to the changed schema upon restoring. The new > snapshot will be written entirely with the new updated avro schema. > > Hope this clarifies how the integration with Avro in Flink works. > > Best, > > Dawid > > > [1] > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html#avro-types > > On 04/11/2019 10:58, Dominik Wosiński wrote: > > Hey, > > I have a question regarding Avro Types and schema evolution. According to > > the docs the schema resolution is compatible with the Avro docs [1]. > > > > But I have done some testing. For example, I have created a record, > written > > it to Kafka, and then changed the order the fields in schema and tried to > > read the data with changed schema using AvroDeserializationSchema from > > Flink, it was failing with IndexOfBounds during deserialization, but > > according to the docs fields are resolved by name during deserialization > > and changed ordering should not really be a problem. > > > > I have also found some other inconsistencies like when the writer is > > missing the field, but it is optional for reader, which means that it has > > default value. According to Avro docs this should be resolved, but I only > > managed to resolve this of the missing field is at the end, not if it's > in > > the middle. > > > > Is there anything that I don't understand here, or is something wrong ?? > > > > Best Regards, > > Dom. > > > > [1]http://avro.apache.org/docs/current/spec.html#Schema+Resolution > > > > |
Hi Dominik,
just to add to Dawids explanation: to have a proper schema evolution on Avro data, it needs to know the schema with which it was written. For state that means that we are storing the used schema once for all state records in the state file, since they all belong to the same schema version in the same checkpoint (it's actually a bit more complicated, but I'll omit that for now). For data coming from Kafka or flowing into Kafka, there may be a very large variety of different schemas with which data is written. Although I wouldn't recommend it at all, it would even be possible to store records with incompatible schemas. Hence, it is absolutely necessary to store the schema on record level to properly support schema evolution. Since the schema often times is bigger than the actual record, it is common practice to just store a pointer to the actual schema in the data, which is a 5 byte header in the case of Confluent schema registry serialization schema (1 marker byte, 4 bytes id). That Confluent schema registry also ensures schema compatibility if configured correctly. While in theory, we would support over techniques to refer to the schema, there is currently no clear alternative to Confluent schema registry yet. Best, Arvid On Mon, Nov 4, 2019 at 5:38 PM Dominik Wosiński <[hidden email]> wrote: > Hey Dawid, > Thanks a lot. I have indeed missed the part that this is actually about > State not the Deserialization itself. > This seems to be clear and consistent now. > > Thanks again, > Best Regards, > Dom. > > > pon., 4 lis 2019 o 13:18 Dawid Wysakowicz <[hidden email]> > napisał(a): > > > Hi Dominik, > > > > I am not sure which documentation do you refer to when saying: "According > > to > > the docs the schema resolution is compatible with the Avro docs", but I > > assume this one[1]. If this is the case then the > > AvroDeserializationSchema plays no role in this process. That page > > describes evolution of the schema of the Flink's state. What that means > > is if you use an Avro class for objects in your state, take savepoint > > and then restore your job with updated schema of those avro objects, > > Flink should migrate the state to the new schema. > > AvroDeserializationSchema is used when reading records from external > > systems e.g. from Kafka. > > > > General remark on Avro's schema migration (not Flink's per se). The avro > > reader needs both the schema with which the record was written (writer's > > schema) and the current schema (reader's schema) to perform the > migration. > > > > In case of AvroDeserializationSchema both are always equal, therefore > > you cannot read records written with different schemas (we are missing > > the writer's schema). If you want to support reading records written > > with different schemas you need to use RegistryDeserializationSchema. > > That DeserializationSchema can provide the writer's schema for every > > record. Flink, out of the box, provides an implementation of > > RegistryDeserializationSchema that integrates with Confluent's schema > > registry for providing the writers schema, but you are free to provide > > your own implementation. > > > > Coming back to the state migration. The way it works is that Flink > > writes the avro schema as part of the snapshot. Therefore it is possible > > to migrate the whole state to the changed schema upon restoring. The new > > snapshot will be written entirely with the new updated avro schema. > > > > Hope this clarifies how the integration with Avro in Flink works. > > > > Best, > > > > Dawid > > > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/schema_evolution.html#avro-types > > > > On 04/11/2019 10:58, Dominik Wosiński wrote: > > > Hey, > > > I have a question regarding Avro Types and schema evolution. According > to > > > the docs the schema resolution is compatible with the Avro docs [1]. > > > > > > But I have done some testing. For example, I have created a record, > > written > > > it to Kafka, and then changed the order the fields in schema and tried > to > > > read the data with changed schema using AvroDeserializationSchema from > > > Flink, it was failing with IndexOfBounds during deserialization, but > > > according to the docs fields are resolved by name during > deserialization > > > and changed ordering should not really be a problem. > > > > > > I have also found some other inconsistencies like when the writer is > > > missing the field, but it is optional for reader, which means that it > has > > > default value. According to Avro docs this should be resolved, but I > only > > > managed to resolve this of the missing field is at the end, not if it's > > in > > > the middle. > > > > > > Is there anything that I don't understand here, or is something wrong > ?? > > > > > > Best Regards, > > > Dom. > > > > > > [1]http://avro.apache.org/docs/current/spec.html#Schema+Resolution > > > > > > > > |
Free forum by Nabble | Edit this page |