LakeShen created FLINK-16639:
--------------------------------
Summary: Flink SQL Kafka source connector, add the no json format filter params when format.type is json
Key: FLINK-16639
URL:
https://issues.apache.org/jira/browse/FLINK-16639 Project: Flink
Issue Type: Improvement
Components: Connectors / Kafka
Reporter: LakeShen
Fix For: 1.10.2
In my thought, kafka source connector is the one of most frequently used connector in flink sql. Flink sql kafka source connector supports the json,csv or other data format. But there is a problem for json format in kafka source connector. For example, flink sql kafka source ddl l
like this:
CREATE TABLE team_olap_table (
a varchar,
b varchar,
)
with (
'connector.type' = 'kafka',
'connector.version' = '0.10',
'connector.topic' = topics',
'connector.properties.0.key' = 'group.id',
'connector.properties.0.value' = 'hello_world',
'connector.properties.1.key' = 'bootstrap.servers',
'connector.properties.1.value' = 'xxx',
'connector.property-version' = '1',
'connector.startup-mode' = 'latest-offset',
'format.type' = 'json',
'format.property-version' = '1',
'format.derive-schema' = 'true',
'update-mode' = 'append'
);
If the kafka topic messages are not json format ,just one or two records,the flink sql task will fail-over all the time .
In order to solve this problem , if flink sql source connector use the json-format, I want to add the 'format.fail-on-not-json-record' param in flink-json module, if this param is true(default),when read the no-json records, the flink will fail, if this param is false, the flink sql task will filter no-json records,the flink task running normally.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)