Dawid Wysakowicz created FLINK-20306:
---------------------------------------- Summary: Accessing a versioned table as of time fails with a cryptic message Key: FLINK-20306 URL: https://issues.apache.org/jira/browse/FLINK-20306 Project: Flink Issue Type: Improvement Components: Table SQL / Planner Affects Versions: 1.12.0 Reporter: Dawid Wysakowicz Fix For: 1.12.0 I tried running a query on a versioned table: {code} CREATE TABLE RatesHistory ( currency_time TIMESTAMP(3) METADATA FROM 'timestamp', currency STRING, rate DECIMAL(38, 10), WATERMARK FOR currency_time AS currency_time -- defines the event time ) WITH ( 'connector' = 'kafka', 'topic' = 'rates', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = 'kafka:9092', 'format' = 'json' -- this is an append only source ); SELECT * from RatesHistory FOR SYSTEM_TIME AS OF TIMESTAMP '2020-11-11 13:12:13'; {code} I understand that might not be supported now, but the exception I got is not very helpful: {code} org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE]. Missing conversion is FlinkLogicalSnapshot[convention: LOGICAL -> STREAM_PHYSICAL] There is 1 empty subset: rel#987:RelSubset#44.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], the relevant part of the original plan is as follows 977:FlinkLogicalSnapshot(period=[2020-11-11 13:12:13]) 975:FlinkLogicalCalc(subset=[rel#976:RelSubset#43.LOGICAL.any.None: 0.[NONE].[NONE]], select=[CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate]) 962:FlinkLogicalTableSourceScan(subset=[rel#974:RelSubset#42.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]]], fields=[currency, rate, timestamp]) Root: rel#981:RelSubset#45.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE] Original rel: FlinkLogicalLegacySink(subset=[rel#117:RelSubset#4.LOGICAL.any.None: 0.[NONE].[NONE]], name=[`default_catalog`.`default_database`.`_tmp_table_1885690557`], fields=[currency_time, currency, rate]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 127 FlinkLogicalCalc(subset=[rel#126:RelSubset#3.LOGICAL.any.None: 0.[NONE].[NONE]], select=[CAST(currency_time) AS currency_time, currency, CAST(rate) AS rate]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 129 FlinkLogicalWatermarkAssigner(subset=[rel#124:RelSubset#2.LOGICAL.any.None: 0.[NONE].[NONE]], rowtime=[currency_time], watermark=[$0]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 123 FlinkLogicalCalc(subset=[rel#122:RelSubset#1.LOGICAL.any.None: 0.[NONE].[NONE]], select=[CAST(timestamp) AS currency_time, currency, rate]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 128 FlinkLogicalTableSourceScan(subset=[rel#120:RelSubset#0.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, timestamp]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}, id = 119 Sets: Set#42, type: RecordType(VARCHAR(2147483647) currency, DECIMAL(38, 10) rate, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) timestamp) rel#974:RelSubset#42.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#962 rel#962:FlinkLogicalTableSourceScan.LOGICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} rel#984:RelSubset#42.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#983 rel#983:StreamExecTableSourceScan.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} Set#43, type: RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate) rel#976:RelSubset#43.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#975 rel#975:FlinkLogicalCalc.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#974,select=CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} rel#986:RelSubset#43.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#985 rel#985:StreamExecCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#984,select=CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory} Set#44, type: RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate) rel#978:RelSubset#44.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#977 rel#977:FlinkLogicalSnapshot.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#976,period=2020-11-11 13:12:13), rowcount=1.0E8, cumulative cost={3.0E8 rows, 2.0E8 cpu, 7.2E9 io, 0.0 network, 0.0 memory} rel#987:RelSubset#44.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null Set#45, type: RecordType:peek_no_expand(BOOLEAN f0, RecordType:peek_no_expand(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate) f1) rel#980:RelSubset#45.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#979 rel#979:FlinkLogicalLegacySink.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#978,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time, currency, rate), rowcount=1.0E8, cumulative cost={4.0E8 rows, 3.0E8 cpu, 7.2E9 io, 0.0 network, 0.0 memory} rel#981:RelSubset#45.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null rel#982:AbstractConverter.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#980,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf} rel#988:StreamExecLegacySink.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#987,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time, currency, rate), rowcount=1.0E8, cumulative cost={inf} Graphviz: digraph G { root [style=filled,label="Root"]; subgraph cluster42{ label="Set 42 RecordType(VARCHAR(2147483647) currency, DECIMAL(38, 10) rate, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) timestamp)"; rel962 [label="rel#962:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel983 [label="rel#983:StreamExecTableSourceScan\ntable=[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] subset974 [label="rel#974:RelSubset#42.LOGICAL.any.None: 0.[NONE].[NONE]"] subset984 [label="rel#984:RelSubset#42.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"] } subgraph cluster43{ label="Set 43 RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate)"; rel975 [label="rel#975:FlinkLogicalCalc\ninput=RelSubset#974,select=CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate\nrows=1.0E8, cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel985 [label="rel#985:StreamExecCalc\ninput=RelSubset#984,select=CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate\nrows=1.0E8, cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] subset976 [label="rel#976:RelSubset#43.LOGICAL.any.None: 0.[NONE].[NONE]"] subset986 [label="rel#986:RelSubset#43.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"] } subgraph cluster44{ label="Set 44 RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate)"; rel977 [label="rel#977:FlinkLogicalSnapshot\ninput=RelSubset#976,period=2020-11-11 13:12:13\nrows=1.0E8, cost={3.0E8 rows, 2.0E8 cpu, 7.2E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] subset978 [label="rel#978:RelSubset#44.LOGICAL.any.None: 0.[NONE].[NONE]"] subset987 [label="rel#987:RelSubset#44.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]",color=red] } subgraph cluster45{ label="Set 45 RecordType:peek_no_expand(BOOLEAN f0, RecordType:peek_no_expand(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate) f1)"; rel979 [label="rel#979:FlinkLogicalLegacySink\ninput=RelSubset#978,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time, currency, rate\nrows=1.0E8, cost={4.0E8 rows, 3.0E8 cpu, 7.2E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box] rel982 [label="rel#982:AbstractConverter\ninput=RelSubset#980,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, cost={inf}",shape=box] rel988 [label="rel#988:StreamExecLegacySink\ninput=RelSubset#987,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time, currency, rate\nrows=1.0E8, cost={inf}",shape=box] subset980 [label="rel#980:RelSubset#45.LOGICAL.any.None: 0.[NONE].[NONE]"] subset981 [label="rel#981:RelSubset#45.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"] } root -> subset981; subset974 -> rel962[color=blue]; subset984 -> rel983[color=blue]; subset976 -> rel975[color=blue]; rel975 -> subset974[color=blue]; subset986 -> rel985[color=blue]; rel985 -> subset984[color=blue]; subset978 -> rel977[color=blue]; rel977 -> subset976[color=blue]; subset980 -> rel979[color=blue]; rel979 -> subset978[color=blue]; subset981 -> rel982; rel982 -> subset980; subset981 -> rel988; rel988 -> subset987; } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |