[jira] [Created] (FLINK-19818) ArrayIndexOutOfBoundsException occus in 'Interval Joins' when the source table have nest json

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-19818) ArrayIndexOutOfBoundsException occus in 'Interval Joins' when the source table have nest json

Shang Yuanchun (Jira)
shizhengchao created FLINK-19818:
------------------------------------

             Summary: ArrayIndexOutOfBoundsException occus in 'Interval Joins' when the source table have nest json
                 Key: FLINK-19818
                 URL: https://issues.apache.org/jira/browse/FLINK-19818
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / Planner
    Affects Versions: 1.11.2
            Reporter: shizhengchao


I get an *ArrayIndexOutOfBoundsException* in *Interval Joins*, when my table source have nest json. as the follows is my test:

{code:sql}
CREATE TABLE Orders (
  nest     ROW<
    id            BIGINT,
    consumerName  STRING,
    price         DECIMAL(10, 5),
    productName   STRING
  >,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka-0.11',
  'topic' = 'Orders',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

DROP TABLE IF EXISTS Shipments;
CREATE TABLE Shipments (
  id            BIGINT,
  orderId       BIGINT,
  origin        STRING,
  destnation    STRING,
  isArrived     BOOLEAN,
  proctime AS PROCTIME()
) WITH (
  'connector' = 'kafka-0.11',
  'topic' = 'Shipments',
  'properties.bootstrap.servers' = 'localhost:9092',
  'properties.group.id' = 'testGroup',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

DROP TABLE IF EXISTS print;
CREATE TABLE print (
  orderId       BIGINT,
  consumerName  STRING,
  price         DECIMAL(10, 5),
  productName   STRING,
  origin        STRING,
  destnation    STRING,
  isArrived     BOOLEAN
) WITH (
  'connector' = 'print'
);

DROP VIEW IF EXISTS IntervalJoinView;
CREATE VIEW IntervalJoinView AS
SELECT
  o.id,
  o.consumerName,
  o.price,
  o.productName,
  s.origin,
  s.destnation,
  s.isArrived
FROM
  (SELECT * FROM Orders) o,
  (SELECT * FROM Shipments) s
WHERE s.orderId = o.id AND o.proctime BETWEEN s.proctime - INTERVAL '4' HOUR AND s.proctime;

INSERT INTO print
SELECT
  id,
  consumerName,
  price,
  productName,
  origin,
  destnation,
  isArrived
FROM IntervalJoinView;
{code}

The following is the exception of flinkļ¼š

{code:log}
Exception in thread "main" java.lang.ArrayIndexOutOfBoundsException: -1
        at java.util.ArrayList.elementData(ArrayList.java:422)
        at java.util.ArrayList.get(ArrayList.java:435)
        at org.apache.calcite.sql.validate.SelectNamespace.getMonotonicity(SelectNamespace.java:73)
        at org.apache.calcite.sql.SqlIdentifier.getMonotonicity(SqlIdentifier.java:375)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectList(SqlToRelConverter.java:4132)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:685)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertViewQuery(SqlToOperationConverter.java:696)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateView(SqlToOperationConverter.java:665)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:228)
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
        at com.fcbox.streaming.sql.submit.StreamingJob.callExecuteSql(StreamingJob.java:239)
        at com.fcbox.streaming.sql.submit.StreamingJob.callCommand(StreamingJob.java:207)
        at com.fcbox.streaming.sql.submit.StreamingJob.run(StreamingJob.java:133)
        at com.fcbox.streaming.sql.submit.StreamingJob.main(StreamingJob.java:77)
{code}




--
This message was sent by Atlassian Jira
(v8.3.4#803005)