Thilo Schneider created FLINK-20128:
---------------------------------------
Summary: Data loss for over windows with rows unbounded preceding
Key: FLINK-20128
URL:
https://issues.apache.org/jira/browse/FLINK-20128 Project: Flink
Issue Type: Bug
Components: API / Python, Table SQL / Planner
Affects Versions: 1.11.2, 1.12.0
Reporter: Thilo Schneider
When using partitioned, unbounded over windows, all but one partitions are dropped in the output dataset:
{code:python}
# Setup
from pyflink.table import EnvironmentSettings, StreamTableEnvironment
from biafflink import debug_print_table
env_settings = EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build()
t_env = StreamTableEnvironment.create( environment_settings=env_settings)
t_env.get_config().get_configuration().set_integer("table.exec.resource.default-parallelism", 1)
t_env.execute_sql("""
CREATE TABLE datagen (
foo INT,
id AS mod(foo, 2),
message_time AS to_timestamp(from_unixtime(FLOOR(foo/2))),
WATERMARK FOR message_time AS message_time
) WITH (
'connector' = 'datagen',
'rows-per-second'='2',
'fields.foo.kind'='sequence',
'fields.foo.start'='0',
'fields.foo.end'='19'
)""")
t_env.execute_sql("CREATE TABLE output (foo INT, id INT, lagfoo INT) WITH ('connector' = 'print')")
{code}
Using bounded over windows, everything works as expected:
{code:python}
t = t_env.sql_query("""
SELECT foo, id, avg(foo) OVER w AS lagfoo
FROM datagen
WINDOW w AS (PARTITION BY id ORDER BY message_time ROWS BETWEEN 1 PRECEDING AND CURRENT ROW)""")
t.execute_insert("output")
{code}
yields
{code:python}
+I(0,0,0)
+I(1,1,1)
+I(2,0,1)
+I(3,1,2)
+I(4,0,3)
...
{code}
If we change the window to unbounded preceding:
{code:python}
t = t_env.sql_query("""
SELECT foo, id, avg(foo) OVER w AS lagfoo
FROM datagen
WINDOW w AS (PARTITION BY id ORDER BY message_time ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)""")
t.execute_insert("output")
{code}
we loose all of id == 1:
{code:python}
+I(0,0,0)
+I(2,0,1)
+I(4,0,2)
+I(6,0,3)
+I(8,0,4)
...
{code}
I observed this problem with various aggregate functions and both under 1.11.2 and 1.12rc1.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)