[jira] [Created] (FLINK-20306) Accessing a versioned table as of time fails with a cryptic message

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-20306) Accessing a versioned table as of time fails with a cryptic message

Shang Yuanchun (Jira)
Dawid Wysakowicz created FLINK-20306:
----------------------------------------

             Summary: Accessing a versioned table as of time fails with a cryptic message
                 Key: FLINK-20306
                 URL: https://issues.apache.org/jira/browse/FLINK-20306
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / Planner
    Affects Versions: 1.12.0
            Reporter: Dawid Wysakowicz
             Fix For: 1.12.0


I tried running a query on a versioned table:

{code}
CREATE TABLE RatesHistory (
    currency_time TIMESTAMP(3) METADATA FROM 'timestamp',
    currency STRING,
    rate DECIMAL(38, 10),
    WATERMARK FOR currency_time AS currency_time   -- defines the event time
) WITH (
  'connector' = 'kafka',
  'topic' = 'rates',
  'scan.startup.mode' = 'earliest-offset',
  'properties.bootstrap.servers' = 'kafka:9092',
  'format' = 'json'                                -- this is an append only source
);

SELECT * from RatesHistory FOR SYSTEM_TIME AS OF TIMESTAMP '2020-11-11 13:12:13';
{code}

I understand that might not be supported now, but the exception I got is not very helpful:

{code}
org.apache.calcite.plan.RelOptPlanner$CannotPlanException: There are not enough rules to produce a node with desired properties: convention=STREAM_PHYSICAL, FlinkRelDistributionTraitDef=any, MiniBatchIntervalTraitDef=None: 0, ModifyKindSetTraitDef=[NONE], UpdateKindTraitDef=[NONE].
Missing conversion is FlinkLogicalSnapshot[convention: LOGICAL -> STREAM_PHYSICAL]
There is 1 empty subset: rel#987:RelSubset#44.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], the relevant part of the original plan is as follows
977:FlinkLogicalSnapshot(period=[2020-11-11 13:12:13])
  975:FlinkLogicalCalc(subset=[rel#976:RelSubset#43.LOGICAL.any.None: 0.[NONE].[NONE]], select=[CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate])
    962:FlinkLogicalTableSourceScan(subset=[rel#974:RelSubset#42.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]]], fields=[currency, rate, timestamp])

Root: rel#981:RelSubset#45.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]
Original rel:
FlinkLogicalLegacySink(subset=[rel#117:RelSubset#4.LOGICAL.any.None: 0.[NONE].[NONE]], name=[`default_catalog`.`default_database`.`_tmp_table_1885690557`], fields=[currency_time, currency, rate]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 127
  FlinkLogicalCalc(subset=[rel#126:RelSubset#3.LOGICAL.any.None: 0.[NONE].[NONE]], select=[CAST(currency_time) AS currency_time, currency, CAST(rate) AS rate]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 129
    FlinkLogicalWatermarkAssigner(subset=[rel#124:RelSubset#2.LOGICAL.any.None: 0.[NONE].[NONE]], rowtime=[currency_time], watermark=[$0]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 123
      FlinkLogicalCalc(subset=[rel#122:RelSubset#1.LOGICAL.any.None: 0.[NONE].[NONE]], select=[CAST(timestamp) AS currency_time, currency, rate]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 0.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 128
        FlinkLogicalTableSourceScan(subset=[rel#120:RelSubset#0.LOGICAL.any.None: 0.[NONE].[NONE]], table=[[default_catalog, default_database, RatesHistory]], fields=[currency, rate, timestamp]): rowcount = 1.0E8, cumulative cost = {1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}, id = 119

Sets:
Set#42, type: RecordType(VARCHAR(2147483647) currency, DECIMAL(38, 10) rate, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) timestamp)
        rel#974:RelSubset#42.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#962
                rel#962:FlinkLogicalTableSourceScan.LOGICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
        rel#984:RelSubset#42.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#983
                rel#983:StreamExecTableSourceScan.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](table=[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp), rowcount=1.0E8, cumulative cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
Set#43, type: RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate)
        rel#976:RelSubset#43.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#975
                rel#975:FlinkLogicalCalc.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#974,select=CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
        rel#986:RelSubset#43.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=rel#985
                rel#985:StreamExecCalc.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#984,select=CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate), rowcount=1.0E8, cumulative cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}
Set#44, type: RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate)
        rel#978:RelSubset#44.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#977
                rel#977:FlinkLogicalSnapshot.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#976,period=2020-11-11 13:12:13), rowcount=1.0E8, cumulative cost={3.0E8 rows, 2.0E8 cpu, 7.2E9 io, 0.0 network, 0.0 memory}
        rel#987:RelSubset#44.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
Set#45, type: RecordType:peek_no_expand(BOOLEAN f0, RecordType:peek_no_expand(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate) f1)
        rel#980:RelSubset#45.LOGICAL.any.None: 0.[NONE].[NONE], best=rel#979
                rel#979:FlinkLogicalLegacySink.LOGICAL.any.None: 0.[NONE].[NONE](input=RelSubset#978,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time, currency, rate), rowcount=1.0E8, cumulative cost={4.0E8 rows, 3.0E8 cpu, 7.2E9 io, 0.0 network, 0.0 memory}
        rel#981:RelSubset#45.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE], best=null
                rel#982:AbstractConverter.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#980,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]), rowcount=1.0E8, cumulative cost={inf}
                rel#988:StreamExecLegacySink.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE](input=RelSubset#987,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time, currency, rate), rowcount=1.0E8, cumulative cost={inf}

Graphviz:
digraph G {
        root [style=filled,label="Root"];
        subgraph cluster42{
                label="Set 42 RecordType(VARCHAR(2147483647) currency, DECIMAL(38, 10) rate, TIMESTAMP_WITH_LOCAL_TIME_ZONE(3) timestamp)";
                rel962 [label="rel#962:FlinkLogicalTableSourceScan\ntable=[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
                rel983 [label="rel#983:StreamExecTableSourceScan\ntable=[default_catalog, default_database, RatesHistory, watermark=[CAST($2):TIMESTAMP(3)]],fields=currency, rate, timestamp\nrows=1.0E8, cost={1.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
                subset974 [label="rel#974:RelSubset#42.LOGICAL.any.None: 0.[NONE].[NONE]"]
                subset984 [label="rel#984:RelSubset#42.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
        }
        subgraph cluster43{
                label="Set 43 RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate)";
                rel975 [label="rel#975:FlinkLogicalCalc\ninput=RelSubset#974,select=CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate\nrows=1.0E8, cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
                rel985 [label="rel#985:StreamExecCalc\ninput=RelSubset#984,select=CAST(Reinterpret(CAST(timestamp))) AS currency_time, currency, CAST(rate) AS rate\nrows=1.0E8, cost={2.0E8 rows, 1.0E8 cpu, 3.6E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
                subset976 [label="rel#976:RelSubset#43.LOGICAL.any.None: 0.[NONE].[NONE]"]
                subset986 [label="rel#986:RelSubset#43.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
        }
        subgraph cluster44{
                label="Set 44 RecordType(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate)";
                rel977 [label="rel#977:FlinkLogicalSnapshot\ninput=RelSubset#976,period=2020-11-11 13:12:13\nrows=1.0E8, cost={3.0E8 rows, 2.0E8 cpu, 7.2E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
                subset978 [label="rel#978:RelSubset#44.LOGICAL.any.None: 0.[NONE].[NONE]"]
                subset987 [label="rel#987:RelSubset#44.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]",color=red]
        }
        subgraph cluster45{
                label="Set 45 RecordType:peek_no_expand(BOOLEAN f0, RecordType:peek_no_expand(TIMESTAMP(3) currency_time, VARCHAR(2147483647) currency, DECIMAL(38, 18) rate) f1)";
                rel979 [label="rel#979:FlinkLogicalLegacySink\ninput=RelSubset#978,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time, currency, rate\nrows=1.0E8, cost={4.0E8 rows, 3.0E8 cpu, 7.2E9 io, 0.0 network, 0.0 memory}",color=blue,shape=box]
                rel982 [label="rel#982:AbstractConverter\ninput=RelSubset#980,convention=STREAM_PHYSICAL,FlinkRelDistributionTraitDef=any,MiniBatchIntervalTraitDef=None: 0,ModifyKindSetTraitDef=[NONE],UpdateKindTraitDef=[NONE]\nrows=1.0E8, cost={inf}",shape=box]
                rel988 [label="rel#988:StreamExecLegacySink\ninput=RelSubset#987,name=`default_catalog`.`default_database`.`_tmp_table_934371579`,fields=currency_time, currency, rate\nrows=1.0E8, cost={inf}",shape=box]
                subset980 [label="rel#980:RelSubset#45.LOGICAL.any.None: 0.[NONE].[NONE]"]
                subset981 [label="rel#981:RelSubset#45.STREAM_PHYSICAL.any.None: 0.[NONE].[NONE]"]
        }
        root -> subset981;
        subset974 -> rel962[color=blue];
        subset984 -> rel983[color=blue];
        subset976 -> rel975[color=blue]; rel975 -> subset974[color=blue];
        subset986 -> rel985[color=blue]; rel985 -> subset984[color=blue];
        subset978 -> rel977[color=blue]; rel977 -> subset976[color=blue];
        subset980 -> rel979[color=blue]; rel979 -> subset978[color=blue];
        subset981 -> rel982; rel982 -> subset980;
        subset981 -> rel988; rel988 -> subset987;
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)