[jira] [Created] (FLINK-19786) Flink doesn't set proper nullability for Logical types for Confluent Avro Serialization

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-19786) Flink doesn't set proper nullability for Logical types for Confluent Avro Serialization

Shang Yuanchun (Jira)
Maciej Bryński created FLINK-19786:
--------------------------------------

             Summary: Flink doesn't set proper nullability for Logical types for Confluent Avro Serialization
                 Key: FLINK-19786
                 URL: https://issues.apache.org/jira/browse/FLINK-19786
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.12.0
            Reporter: Maciej Bryński


When Flink is creating schema in registry nullability is not properly set for logical types.
Examples. Table:


{code:sql}
create table `test_logical_null` (
        `string_field` STRING,
        `timestamp_field` TIMESTAMP(3)
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-logical-null',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'test12345',
   'scan.startup.mode' = 'earliest-offset',
  'format' = 'avro-confluent', -- Must be set to 'avro-confluent' to configure this format.
  'avro-confluent.schema-registry.url' = '<a href="http://localhost:8081'">http://localhost:8081', -- URL to connect to Confluent Schema Registry
  'avro-confluent.schema-registry.subject' = 'test-logical-null' -- Subject name to write to the Schema Registry service; required for sinks
)
{code}

Schema:


{code:json}
{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "string_field",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "timestamp_field",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}
{code}

For not null fields:

{code:sql}
create table `test_logical_notnull` (
        `string_field` STRING NOT NULL,
        `timestamp_field` TIMESTAMP(3) NOT NULL
) WITH (
  'connector' = 'kafka',
  'topic' = 'test-logical-notnull',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'test12345',
   'scan.startup.mode' = 'earliest-offset',
  'format' = 'avro-confluent', -- Must be set to 'avro-confluent' to configure this format.
  'avro-confluent.schema-registry.url' = '<a href="http://localhost:8081'">http://localhost:8081', -- URL to connect to Confluent Schema Registry
  'avro-confluent.schema-registry.subject' = 'test-logical-notnull-value' -- Subject name to write to the Schema Registry service; required for sinks
);
{code}
Schema

{code:json}
{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "string_field",
      "type": "string"
    },
    {
      "name": "timestamp_field",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}
{code}
As we can see for string_field we have proper union with null (for nullable field). For timestamp_field in both examples union is missing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)