Alexander Filipchik created FLINK-18603:
-------------------------------------------
Summary: SQL fails with java.lang.IllegalStateException: No operators defined in streaming topology
Key: FLINK-18603
URL:
https://issues.apache.org/jira/browse/FLINK-18603 Project: Flink
Issue Type: Bug
Components: Table SQL / Runtime
Affects Versions: 1.11.0
Reporter: Alexander Filipchik
Hi, was playing with 1.11 and found that code that worked in 1.10.1 fails in 1.11.0 with :
{code:java}
Exception in thread "main" java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.
at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:47)
at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at com.css.flink.avro.confluent.table.SqlTest.main(SqlTest.java:53)
{code}
code example:
{code:java}
StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
String createTable =
String.format(
"create table EnrichedOrders ("
+ "name VARCHAR,"
+ "proctime AS PROCTIME()"
+ ") with ("
+ " 'connector.type' = 'kafka',"
+ " 'connector.version' = 'universal',"
+ " 'connector.property-version' = '1',"
+ " 'connector.topic' = '%s',"
+ " 'connector.properties.bootstrap.servers' = '%s',"
+ " 'connector.properties.group.id' = '%s',"
+ " 'connector.startup-mode' = 'earliest-offset',"
+ " 'update-mode' = 'append',"
+ " 'format.type' = 'confluent-avro',"
+ " 'format.schema-registry' = '%s'"
+ ")",
"avro",
"broker",
"testSqlLocal",
"registry");
tEnv.executeSql(createTable);
tEnv.toAppendStream(
tEnv.sqlQuery(
"select name, sum(*) "
+ "from EnrichedOrders "
+ "GROUP BY TUMBLE(proctime, INTERVAL '1' MINUTE), name"),
Row.class)
.print();
tEnv.execute("testSql");
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)