Mai created FLINK-12898:
--------------------------- Summary: FlinkSQL job fails when rowTime field encounters dirty data Key: FLINK-12898 URL: https://issues.apache.org/jira/browse/FLINK-12898 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.7.2 Reporter: Mai I use FlinkSQL to process Kafka data in the following format: | id | server_time | | 1 | 2019-05-15 10:00:00 | | 2 | 2019-05-15 10:00:00 | ....... and I define rowtime from the server_time field: new Schema() .field("rowtime", Types.SQL_TIMESTAMP) .rowtime(new Rowtime().timestampsFromField("server_time")) .field("id", Types.String) .field("server_time", Types.String) when dirty data arrives, such as : | id | server_time | | 99 | 11.22.33.44 | My FlinkSQL job fails with exception: java.lang.NumberFormatException: For input string: "11.22.33.44" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Integer.parseInt(Integer.java:580) at java.lang.Integer.parseInt(Integer.java:615) at org.apache.calcite.avatica.util.DateTimeUtils.dateStringToUnixDate(DateTimeUtils.java:625) at org.apache.calcite.avatica.util.DateTimeUtils.timestampStringToUnixDate(DateTimeUtils.java:715) at DataStreamSourceConversion$288.processElement(Unknown Source) at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:310) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:409) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.emitRecord(KafkaFetcher.java:187) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:152) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:665) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:94) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:58) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:99) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) Because my flink job use EXACTLY_ONCE, so the job is re-executed from the last checkpoint, consumes dirty data again, fails again, and keeps looping like this. -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |