Svend Vanderveken created FLINK-20999:
----------------------------------------- Summary: Confluent Avro Format should document how to serialize kafka keys Key: FLINK-20999 URL: https://issues.apache.org/jira/browse/FLINK-20999 Project: Flink Issue Type: Improvement Components: Documentation Affects Versions: 1.12.0 Reporter: Svend Vanderveken The [Confluent Avro Format|https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/formats/avro-confluent.html] only shows example of how to serialize/deserialize Kafka values. Also, parameter description is not always clear what is influencing the source and the sink behaviour, IMHO. This seems surprising especially in the context of a sink kafka connector since keys are such an important concept in that case. Adding examples of how to serialize/deserialize Kafka keys would add clarity. While it can be argued that a connector format is independent from the underlying storage, probably showing kafka-oriented examples in this case (i.e, with a concept of "key" and "value") makes senses here since this connector is very much thought with Kafka in mind. I suggest to add this: h3. writing to Kafka while keeping the keys in "raw" big endian format: {code:java} CREATE TABLE OUTPUT_TABLE ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'key.format' = 'raw', 'key.raw.endianness' = 'big-endian', 'key.fields' = 'user_id', 'value.format' = 'avro-confluent', 'value.avro-confluent.schema-registry.url' = '<a href="http://localhost:8081'">http://localhost:8081', 'value.avro-confluent.schema-registry.subject' = 'user_behavior' ) {code} h3. writing to Kafka while registering both the key and the value to the schema registry {code:java} CREATE TABLE OUTPUT_TABLE ( user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', -- => this will register a {user_id: long} Avro type in the schema registry. -- Watch out: schema evolution in the context of a Kafka key is almost never backward nor -- forward compatible in practice due to hash partitioning. 'key.avro-confluent.schema-registry.url' = '<a href="http://localhost:8081'">http://localhost:8081', 'key.avro-confluent.schema-registry.subject' = 'user_behavior_key', 'key.format' = 'avro-confluent', 'key.fields' = 'user_id', 'value.format' = 'avro-confluent', 'value.avro-confluent.schema-registry.url' = '<a href="http://localhost:8081'">http://localhost:8081', 'value.avro-confluent.schema-registry.subject' = 'user_behavior_value' ) {code} h3. reading form Kafka with both the key and value schema in the registry while resolving field name clashes: {code:java} CREATE TABLE INPUT_TABLE ( -- user_id as read from the kafka key: from_kafka_key_user_id BIGINT, -- user_id, and other fields, as read from the kafka value- user_id BIGINT, item_id BIGINT, category_id BIGINT, behavior STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'user_behavior', 'properties.bootstrap.servers' = 'localhost:9092', 'key.format' = 'avro-confluent', 'key.avro-confluent.schema-registry.url' = '<a href="http://localhost:8081'">http://localhost:8081', 'key.fields' = 'from_kafka_key_user_id', -- Adds a column prefix when mapping the avro fields of the kafka key to columns of this Table -- to avoid clashes with avro fields of the value (both contain 'user_id' in this example) 'key.fields-prefix' = 'from_kafka_key_', 'value.format' = 'avro-confluent', -- cannot include key here since dealt with above 'value.fields-include' = 'EXCEPT_KEY', 'value.avro-confluent.schema-registry.url' = '<a href="http://localhost:8081'">http://localhost:8081' ) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |