[jira] [Created] (FLINK-20961) Flink throws NullPointerException for tables created from DataStream with no assigned timestamps and watermarks

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20961) Flink throws NullPointerException for tables created from DataStream with no assigned timestamps and watermarks

Shang Yuanchun (Jira)
Yuval Itzchakov created FLINK-20961:
---------------------------------------

             Summary: Flink throws NullPointerException for tables created from DataStream with no assigned timestamps and watermarks
                 Key: FLINK-20961
                 URL: https://issues.apache.org/jira/browse/FLINK-20961
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Runtime
    Affects Versions: 1.12.0
            Reporter: Yuval Itzchakov


 

Given the following program:
{code:java}
//import org.apache.flink.api.common.eventtime.{ SerializableTimestampAssigner, WatermarkStrategy }
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.table.annotation.{DataTypeHint, FunctionHint}
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
import org.apache.flink.table.api.{$, AnyWithOperations}
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction}
import java.time.Instant

object BugRepro {
  def text: String =
    s"""
       |{
       |  "s": "hello",
       |  "i": ${Random.nextInt()}
       |}
       |""".stripMargin  def main(args: Array[String]): Unit = {
    val flink =
      StreamExecutionEnvironment.createLocalEnvironment()
    val tableEnv = StreamTableEnvironment.create(flink)
    val dataStream = flink
      .addSource {
        new SourceFunction[(Long, String)] {
          var isRunning = true          
          override def run(ctx: SourceFunction.SourceContext[(Long, String)]): Unit =
            while (isRunning) {
              val x = (Instant.now().toEpochMilli, text)
              ctx.collect(x)
              ctx.emitWatermark(new Watermark(x._1))
              Thread.sleep(300)
            }          
            override def cancel(): Unit =
              isRunning = false
        }
      }
//      .assignTimestampsAndWatermarks(
//        WatermarkStrategy
//          .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(30))
//          .withTimestampAssigner {
//            new SerializableTimestampAssigner[(Long, String)] {
//              override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long =
//                element._1
//            }
//          }
//      )
//
    tableEnv.createTemporaryView("testview", dataStream, $("event_time").rowtime(), $("json_text"))
    val res = tableEnv.sqlQuery("""
                                  |SELECT json_text
                                  |FROM testview
                                  |""".stripMargin)    
    val sink = tableEnv.executeSql(
      """
        |CREATE TABLE SINK (
        |  json_text STRING
        |)
        |WITH (
        |  'connector' = 'print'
        |)
        |""".stripMargin
    )    res.executeInsert("SINK").await()
    ()
  }
    res.executeInsert("SINK").await()

{code}
 

Flink will throw a NullPointerException at runtime:
{code:java}
Caused by: java.lang.NullPointerExceptionCaused by: java.lang.NullPointerException at SourceConversion$3.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305) at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394) at ai.hunters.pipeline.BugRepro$$anon$1.run(BugRepro.scala:78) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:215)
{code}
This is due to the fact that the DataStream did not assign a timestamp to the underlying source. This is the generated code:
{code:java}
public class SourceConversion$3 extends org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
          implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {        private final Object[] references;
        private transient org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter converter$0;
        org.apache.flink.table.data.GenericRowData out = new org.apache.flink.table.data.GenericRowData(2);
        private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);        public SourceConversion$3(
            Object[] references,
            org.apache.flink.streaming.runtime.tasks.StreamTask task,
            org.apache.flink.streaming.api.graph.StreamConfig config,
            org.apache.flink.streaming.api.operators.Output output,
            org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception {
          this.references = references;
          converter$0 = (((org.apache.flink.table.data.util.DataFormatConverters.CaseClassConverter) references[0]));
          this.setup(task, config, output);
          if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) {
            ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this)
              .setProcessingTimeService(processingTimeService);
          }
        }        @Override
        public void open() throws Exception {
          super.open();
         
        }        @Override
        public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {
          org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) (org.apache.flink.table.data.RowData) converter$0.toInternal((scala.Tuple2) element.getValue());
         
          org.apache.flink.table.data.TimestampData result$1;
          boolean isNull$1;
          org.apache.flink.table.data.binary.BinaryStringData field$2;
          boolean isNull$2;
          isNull$2 = in1.isNullAt(1);
          field$2 = org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
          if (!isNull$2) {
            field$2 = ((org.apache.flink.table.data.binary.BinaryStringData) in1.getString(1));
          }
         
          ctx.element = element;
         
         
         
          result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp());
          if (result$1 == null) {
            throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " +
              "proper TimestampAssigner is defined and the stream environment uses the EventTime " +
              "time characteristic.");
          }
          isNull$1 = false;
          if (isNull$1) {
            out.setField(0, null);
          } else {
            out.setField(0, result$1);
          }
                   
         
         
          if (isNull$2) {
            out.setField(1, null);
          } else {
            out.setField(1, field$2);
          }
                   
                 
          output.collect(outElement.replace(out));
          ctx.element = null;
         
        }                @Override
        public void close() throws Exception {
           super.close();
         
        }        
      }
{code}
The important line is here:
{code:java}
 result$1 = org.apache.flink.table.data.TimestampData.fromEpochMillis(ctx.timestamp()); if (result$1 == null) { throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic.");
{code}
`ctx.timestamp` returns null in case no timestamp assigner was created, and `TimestampData.fromEpochMillis` expects a primitive `long`, so a deference fails. The actual check should be:
{code:java}
if (!ctx.hasTimestamp) {
  throw new RuntimeException("Rowtime timestamp is null. Please make sure that a " + "proper TimestampAssigner is defined and the stream environment uses the EventTime " + "time characteristic.");
}

result$1 = TimestampData.fromEpochMillis(ctx.timestamp());{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)