Thilo Schneider created FLINK-20100:
---------------------------------------
Summary: Lag aggregate function does not return lag, but current row
Key: FLINK-20100
URL:
https://issues.apache.org/jira/browse/FLINK-20100 Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.11.2
Reporter: Thilo Schneider
The lag aggregate function seems to always return the current row and not the row one lagged behind:
{code:java}
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create( environment_settings=env_settings)
t_env.execute_sql("""
CREATE TABLE datagen (
foo INT,
message_time AS to_timestamp(from_unixtime(foo)),
WATERMARK FOR message_time AS message_time
) WITH (
'connector' = 'datagen',
'rows-per-second'='3',
'fields.foo.kind'='sequence',
'fields.foo.start'='1',
'fields.foo.end'='10'
)""")
t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen WINDOW w AS (ORDER BY message_time)")
t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH ('connector' = 'print')")
t.execute_insert("output")
{code}
This results in
{code:java}
+I(1,1) // Expected (1, null)
+I(2,2) // Expected (2, 1)
+I(3,3) // Expected (3, 2)
+I(4,4) // and so on
+I(5,5)
+I(6,6)
+I(7,7)
+I(8,8)
+I(9,9)
+I(10,10)
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)