[jira] [Created] (FLINK-20100) Lag aggregate function does not return lag, but current row

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20100) Lag aggregate function does not return lag, but current row

Shang Yuanchun (Jira)
Thilo Schneider created FLINK-20100:
---------------------------------------

             Summary: Lag aggregate function does not return lag, but current row
                 Key: FLINK-20100
                 URL: https://issues.apache.org/jira/browse/FLINK-20100
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.11.2
            Reporter: Thilo Schneider


The lag aggregate function seems to always return the current row and not the row one lagged behind:

 

 
{code:java}
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create( environment_settings=env_settings)

t_env.execute_sql("""
CREATE TABLE datagen (
 foo INT,
 message_time AS to_timestamp(from_unixtime(foo)),
 WATERMARK FOR message_time AS message_time
) WITH (
 'connector' = 'datagen',
 'rows-per-second'='3',
 'fields.foo.kind'='sequence',
 'fields.foo.start'='1',
 'fields.foo.end'='10'
)""")

t = t_env.sql_query("SELECT foo, lag(foo) OVER w AS lagfoo FROM datagen WINDOW w AS (ORDER BY message_time)")

t_env.execute_sql("CREATE TABLE output (foo INT, lagfoo INT) WITH ('connector' = 'print')")
t.execute_insert("output")
{code}
 

This results in

 
{code:java}
+I(1,1)  // Expected (1, null)
+I(2,2)  // Expected (2, 1)
+I(3,3)  // Expected (3, 2)
+I(4,4)  // and so on
+I(5,5)
+I(6,6)
+I(7,7)
+I(8,8)
+I(9,9)
+I(10,10)
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)