[jira] [Created] (FLINK-20463) flink-1.11.2 -sql cannot ignore exception record

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20463) flink-1.11.2 -sql cannot ignore exception record

Shang Yuanchun (Jira)
谢波 created FLINK-20463:
--------------------------

             Summary: flink-1.11.2 -sql cannot ignore exception record
                 Key: FLINK-20463
                 URL: https://issues.apache.org/jira/browse/FLINK-20463
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.11.2
         Environment: <flink.version>1.11.2</flink.version>
<scala.binary.version>2.11</scala.binary.version>
            Reporter: 谢波


can Flink SQL provide an option to ignore exception record?

I have a table that maps kafka data in json format.

When parsing the exception data, an exception is thrown, but the data is valid JSON, not a valid record.

{color:#FF0000}exception data:{"SHEET":[""]}{color}

{color:#FF0000}my table:{color}

CREATE TABLE offline
(
 SHEET ROW (
 HEADER MAP < STRING, STRING >,
 ITEM ROW (
 AMOUNT STRING,
 COST STRING,
 GOODSID STRING,
 SALEVALUE STRING,
 SAP_RTMATNR STRING,
 SAP_RTPLU STRING,
 SERIALID STRING,
 SHEETID STRING
 ) ARRAY,
 ITEM5 MAP < STRING, STRING > ARRAY,
 ITEM1 MAP < STRING, STRING > ARRAY,
 TENDER MAP < STRING, STRING > ARRAY
 ) ARRAY
)
WITH (
 'connector' = 'kafka',
 'properties.bootstrap.servers' = 'xxx:9092',
 'properties.group.id' = 'realtime.sales.offline.group',
 'topic' = 'bms133',
 'format' = 'json',
 {color:#FF0000}'json.ignore-parse-errors' = 'true',{color}
 'scan.startup.mode' = 'earliest-offset'
);

{color:#FF0000}exception:{color}

Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:116) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copyGenericArray(ArrayDataSerializer.java:129) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:90) at org.apache.flink.table.runtime.typeutils.ArrayDataSerializer.copy(ArrayDataSerializer.java:51) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copyRowData(RowDataSerializer.java:156) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:123) at org.apache.flink.table.runtime.typeutils.RowDataSerializer.copy(RowDataSerializer.java:50) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:715) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:352) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:185) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:141) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:755) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:213)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)