[jira] [Created] (FLINK-20131) Error when using LAST_VALUE on two different datatypes in same query with over window

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20131) Error when using LAST_VALUE on two different datatypes in same query with over window

Shang Yuanchun (Jira)
Thilo Schneider created FLINK-20131:
---------------------------------------

             Summary: Error when using LAST_VALUE on two different datatypes in same query with over window
                 Key: FLINK-20131
                 URL: https://issues.apache.org/jira/browse/FLINK-20131
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.12.0
            Reporter: Thilo Schneider


When using "LAST_VALUE(x) OVER w" on two different datatypes within the same query, the application crashes with java.lang.Integer cannot be cast to java.lang.Double (or similar, depending on used types).
{code:python}
#setup
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create( environment_settings=env_settings)
t_env.get_config().get_configuration().set_integer("table.exec.resource.default-parallelism", 1)

t_env.execute_sql("""
CREATE TABLE datagen (
 foo INT,
 val AS cast(foo AS double),
 message_time AS to_timestamp(from_unixtime(foo)),
 WATERMARK FOR message_time AS message_time
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='2',
 'fields.foo.kind'='sequence',
 'fields.foo.start'='0',
 'fields.foo.end'='19'
)""")
{code}
This works:
{code:python}
t = t_env.sql_query("SELECT last_value(val), last_value(foo) AS lagfoo FROM datagen ")
t_env.execute_sql("CREATE TABLE output (a double, foo INT) WITH ('connector' = 'print')")
t.execute_insert("output")
{code}
This doesn't work:
{code:python}
t = t_env.sql_query("SELECT last_value(val) OVER w, last_value(foo) OVER w AS lagfoo FROM datagen WINDOW w AS (ORDER BY message_time ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)")
t_env.execute_sql("CREATE TABLE output (a double, foo INT) WITH ('connector' = 'print')")
t.execute_insert("output")
{code}
The resulting stacktrace:
{code:java}
java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double
        at org$apache$flink$table$planner$functions$aggfunctions$LastValueWithRetractAggFunction$LastValueWithRetractAccumulator$Converter.toInternal(Unknown Source) ~[?:?]
        at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:92) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.table.data.conversion.StructuredObjectConverter.toInternal(StructuredObjectConverter.java:47) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.table.data.conversion.DataStructureConverter.toInternalOrNull(DataStructureConverter.java:59) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
        at BoundedOverAggregateHelper$43.getAccumulators(Unknown Source) ~[?:?]
        at org.apache.flink.table.runtime.operators.over.RowTimeRowsBoundedPrecedingFunction.onTimer(RowTimeRowsBoundedPrecedingFunction.java:303) ~[flink-table-blink_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.api.operators.KeyedProcessOperator.invokeUserFunction(KeyedProcessOperator.java:94) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.api.operators.KeyedProcessOperator.onEventTime(KeyedProcessOperator.java:72) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.advanceWatermark(InternalTimerServiceImpl.java:276) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.api.operators.InternalTimeServiceManagerImpl.advanceWatermark(InternalTimeServiceManagerImpl.java:183) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:600) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitWatermark(OneInputStreamTask.java:199) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:95) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:181) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:152) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:372) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:186) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:577) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:541) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) ~[flink-dist_2.11-1.12.0.jar:1.12.0]
        at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_141]
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)