[jira] [Created] (FLINK-16334) flink-sql kafka-connector support ignore the invalid data during parsing bytes to json row

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-16334) flink-sql kafka-connector support ignore the invalid data during parsing bytes to json row

Shang Yuanchun (Jira)
roncenzhao created FLINK-16334:
----------------------------------

             Summary: flink-sql kafka-connector support ignore the invalid data during parsing bytes to json row
                 Key: FLINK-16334
                 URL: https://issues.apache.org/jira/browse/FLINK-16334
             Project: Flink
          Issue Type: Wish
          Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
    Affects Versions: 1.10.0
         Environment: flink1.10+kafka+json
            Reporter: roncenzhao


We found that, if we create table like this:

 
{code:java}
CREATE TABLE MyUserTable (
  id BIGINT,
  name STRING
) WITH (
  'connector.type' = 'kafka',
  'connector.version' = 'universal',
  'connector.topic' = 'test_topic',
  'connector.properties.bootstrap.servers' = 'xxx',
  'connector.properties.zookeeper.connect' = 'xxx',
  'connector.properties.group.id' = 'g_test',
  --'connector.startup-mode' = 'earliest-offset',
  --'connector.startup-mode' = 'latest-offset',
  'connector.startup-mode' = 'group-offsets',
  'format.type' = 'json',
  'format.fail-on-missing-field' = 'false'
);
{code}
If execute `select * from MyUserTable` and the current row is not json type, the job will be failed and the offset of the consumer group will be reset to the latest offset.

I think we should add some configuration like 'format.fail-on-missing-field' e.g 'format.fail-on-invalid-json' to ignore current invalid row.

Looking forward to your reply!

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)