[jira] [Created] (FLINK-15935) Unable to use watermark when depends both on flink planner and blink planner

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-15935) Unable to use watermark when depends both on flink planner and blink planner

Shang Yuanchun (Jira)
Jeff Zhang created FLINK-15935:
----------------------------------

             Summary: Unable to use watermark when depends both on flink planner and blink planner
                 Key: FLINK-15935
                 URL: https://issues.apache.org/jira/browse/FLINK-15935
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.10.0
            Reporter: Jeff Zhang


Run the following code in module `flink-table-examples` (must be under this module)
{code:java}

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.flink.table.examples.java;


import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.java.StreamTableEnvironment;

/**
 * javadoc.
 */
public class TableApiExample {

   /**
    *
    * @param args
    * @throws Exception
    */
   public static void main(String[] args) throws Exception {

      StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
      bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
      StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
      bsTableEnv.sqlUpdate( "CREATE TABLE sink_kafka (\n" +
         "    status  STRING,\n" +
         "    direction STRING,\n" +
         "    event_ts TIMESTAMP(3),\n" +
         "    WATERMARK FOR event_ts AS event_ts - INTERVAL '5' SECOND\n" +
         ") WITH (\n" +
         "  'connector.type' = 'kafka',       \n" +
         "  'connector.version' = 'universal',    \n" +
         "  'connector.topic' = 'generated.events2',\n" +
         "  'connector.properties.zookeeper.connect' = 'localhost:2181',\n" +
         "  'connector.properties.bootstrap.servers' = 'localhost:9092',\n" +
         "  'connector.properties.group.id' = 'testGroup',\n" +
         "  'format.type'='json',\n" +
         "  'update-mode' = 'append'\n" +
         ")\n");

   }
}
 {code}
 

And hit the following error:
{code:java}

Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to line 5, column 38: Unknown identifier 'event_ts'Exception in thread "main" org.apache.calcite.runtime.CalciteContextException: From line 5, column 31 to line 5, column 38: Unknown identifier 'event_ts' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:834) at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:819) at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:4841) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5667) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlIdentifier.accept(SqlIdentifier.java:317) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.SqlOperator.deriveType(SqlOperator.java:501) at org.apache.calcite.sql.SqlBinaryOperator.deriveType(SqlBinaryOperator.java:144) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5600) at org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5587) at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1691) at org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1676) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:947) at org.apache.calcite.sql.validate.SqlValidatorImpl.validateParameterizedExpression(SqlValidatorImpl.java:930) at org.apache.flink.table.planner.operations.SqlToOperationConverter.lambda$createTableSchema$8(SqlToOperationConverter.java:509) at java.util.Optional.ifPresent(Optional.java:159) at org.apache.flink.table.planner.operations.SqlToOperationConverter.createTableSchema(SqlToOperationConverter.java:505) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertCreateTable(SqlToOperationConverter.java:177) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:130) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:484) at org.apache.flink.table.examples.java.TableApiExample.main(TableApiExample.java:43)Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Unknown identifier 'event_ts' at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:463) at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:572) ... 25 more {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)