[jira] [Created] (FLINK-16639) Flink SQL Kafka source connector, add the no json format filter params when format.type is json

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-16639) Flink SQL Kafka source connector, add the no json format filter params when format.type is json

Shang Yuanchun (Jira)
LakeShen created FLINK-16639:
--------------------------------

             Summary: Flink SQL Kafka source connector, add the no json format filter params when format.type is json
                 Key: FLINK-16639
                 URL: https://issues.apache.org/jira/browse/FLINK-16639
             Project: Flink
          Issue Type: Improvement
          Components: Connectors / Kafka
            Reporter: LakeShen
             Fix For: 1.10.2


In my thought, kafka source connector is the one of most frequently used connector in flink sql.  Flink sql kafka source connector supports the json,csv or other data format. But there is a problem for json format in kafka source connector.  For example, flink sql kafka source ddl l
 like this:
CREATE TABLE team_olap_table (
    a varchar,
    b varchar,
  )
  with (
    'connector.type' = 'kafka',
    'connector.version' = '0.10',
    'connector.topic' = topics',
    'connector.properties.0.key' = 'group.id',
    'connector.properties.0.value' = 'hello_world',
    'connector.properties.1.key' = 'bootstrap.servers',
    'connector.properties.1.value' = 'xxx',
    'connector.property-version' = '1',
    'connector.startup-mode' = 'latest-offset',
    'format.type' = 'json',
    'format.property-version' = '1',
    'format.derive-schema' = 'true',
    'update-mode' = 'append'
);

If the kafka topic messages are not json format ,just one or two records,the flink sql task will fail-over all the time .
In order to solve this problem , if flink sql source connector use the json-format, I want to add the 'format.fail-on-not-json-record' param  in flink-json module, if this param is true(default),when read the no-json records, the flink will fail, if this param is false, the flink sql task will filter no-json records,the flink task running normally.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)