[jira] [Created] (FLINK-22449) Casting an invalid constant string to int throws exception from SinkNotNullEnforcer

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-22449) Casting an invalid constant string to int throws exception from SinkNotNullEnforcer

Shang Yuanchun (Jira)
Caizhi Weng created FLINK-22449:
-----------------------------------

             Summary: Casting an invalid constant string to int throws exception from SinkNotNullEnforcer
                 Key: FLINK-22449
                 URL: https://issues.apache.org/jira/browse/FLINK-22449
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.13.0
            Reporter: Caizhi Weng


Add the following test case to {{CalcITCase}} to reproduce this bug:

{code:scala}
@Test
def myTest(): Unit = {
  checkResult("SELECT CAST('haha' AS INT)", Seq(row(null)))
}
{code}

The exception stack is
{code}
Caused by: org.apache.flink.table.api.TableException: Column 'EXPR$0' is NOT NULL, however, a null value is being written into it. You can set job configuration 'table.exec.sink.not-null-enforcer'='drop' to suppress this exception and drop such records silently.
        at org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:56)
        at org.apache.flink.table.runtime.operators.sink.SinkNotNullEnforcer.filter(SinkNotNullEnforcer.java:30)
        at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at BatchExecCalc$33.processElement(Unknown Source)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.pushToOperator(ChainingOutput.java:112)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:93)
        at org.apache.flink.streaming.runtime.tasks.ChainingOutput.collect(ChainingOutput.java:39)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
        at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:317)
        at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:411)
        at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)
{code}

This is because the result type of CAST is inferred as NOT NULL (see {{SqlCastFunction#inferReturnType}} and {{StandardConvertletTable#convertCast}}, the nullability is the same with the input argument), however parsing an invalid string to int will produce null values.

One way I could think of is to change the result type of CAST to always nullable (at least for some specific types of casting, for example casting from string to int), but as CAST is a very low-level function this might have a big impact (for example, if a rule adds casting, the resulting row type might not be equal to the original row type due to mismatch in nullability).

So it seems that at the current stage we should set all columns in a select sink to be nullable. However this indicates that one cannot truly rely on the nullability of any result type.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)