Stefano created FLINK-13944:
------------------------------- Summary: Table.toAppendStream: InvalidProgramException: Table program cannot be compiled. Key: FLINK-13944 URL: https://issues.apache.org/jira/browse/FLINK-13944 Project: Flink Issue Type: Bug Components: API / Scala, Table SQL / API Affects Versions: 1.9.0, 1.8.1 Environment: {{$ java -version}} {{ openjdk version "1.8.0_222"}} {{ OpenJDK Runtime Environment (build 1.8.0_222-8u222-b10-1ubuntu1~18.04.1-b10)}} {{ OpenJDK 64-Bit Server VM (build 25.222-b10, mixed mode)}} {{------}} {{$ scala -version}} {{Scala code runner version 2.11.12 -- Copyright 2002-2017, LAMP/EPFL}} {{------}} {{build.}}{{sbt}} [...] ThisBuild / scalaVersion := "2.11.12" val flinkVersion = "1.9.0" val flinkDependencies = Seq( "org.apache.flink" %% "flink-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-streaming-scala" % flinkVersion % "provided", "org.apache.flink" %% "flink-table-planner" % flinkVersion % "provided") [...] Reporter: Stefano Attachments: app.zip {{Using: Scala streaming API and the StreamTableEnvironment.}} Given the classes: {{object EntityType extends Enumeration {}} {{ type EntityType = Value}} {{ val ACTIVITY = Value}} {{}}} {{sealed trait Entity extends Serializable}} {{case class Activity(card_id: Long, date_time: Timestamp, second: Long, station_id: Long, station_name: String, activity_code: Long, amount: Long) extends Entity}} What I try to do is{{ to convert a table after selection to an appendStream.}} {{/** activity table **/}} {{val activityDataStream = partialComputation1}} {{ .filter(_._1 == EntityType.ACTIVITY)}} {{ .map(x => x._3.asInstanceOf[Activity])}} {{tableEnv.registerDataStream("activity", activityDataStream, 'card_id, 'date_time, 'second, 'station_id, 'station_name, 'activity_code, 'amount)}} {{val selectedTable = tableEnv.scan("activity").select("card_id, second")}} {{selectedTable.printSchema()}} {{// root}} {{// |-- card_id: BIGINT}} {{// |-- second: BIGINT}} {{// ATTEMPT 1}} {{// val output = tableEnv.toAppendStream[(Long, Long)](selectedTable)}} {{// output.print}} {{// ATTEMPT 2}} {{// val output = tableEnv.toAppendStream[(java.lang.Long, java.lang.Long)](selectedTable)}} {{// output.print}} {{// ATTEMPT 3}} {{// val output = tableEnv.toAppendStream[Row](selectedTable)}} {{// output.print}} {{// ATTEMPT 4}} {{case class Test(card_id: Long, second: Long) extends Entity}}{{val output = tableEnv.toAppendStream[Test](selectedTable)}} {{output.print}} The result for each of the attempts is always the same: {{------------------------------------------- The program finished with the following exception:}} {{org.apache.flink.client.program.ProgramInvocationException: Job failed. (JobID: 334fe364c516008ca34b76e27c5c6f79) at org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:262) at org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:338) at ... 23 more Caused by: org.apache.flink.api.common.InvalidProgramException: *Table program cannot be compiled. This is a bug. Please file an issue.* at org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.compile(CRowOutputProcessRunner.scala:36) at org.apache.flink.table.runtime.CRowOutputProcessRunner.open(CRowOutputProcessRunner.scala:50) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:529) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748)}} My project in which I face the error is attached. -- This message was sent by Atlassian Jira (v8.3.2#803003) |
Free forum by Nabble | Edit this page |