When using Flink 1.9.3 with Blink Planner, I tried to union some tables with
Chinese constants and get an OutOfMemoryError of Java heap space. Here are the code and the error message. I turned to the old planner and it works. Then I upgraded Flink to 1.11.2 and it also works. Also, it does work when I remove either 'union all' or Chinese words. I searched JIRA and found FLINK-16113 <https://issues.apache.org/jira/browse/FLINK-16113> with some bugfix of Chinese words, but what about the memory? And what about union all? *Code in Flink 1.9.3:* *Error Message:* *Code in Flink 1.11.2 with some new syntax:* ----- Best wishes. Smile -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Best wishes.
Smile |
*Code in Flink 1.9.3:*
import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.java.StreamTableEnvironment; import org.apache.flink.types.Row; public class StreamingJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings mySetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, mySetting); env.setParallelism(1); DataStream<String> oriStream = env.fromElements("test", "union all"); Table testTable = tableEnv.fromDataStream(oriStream, "text"); tableEnv.registerTable("test_table", testTable); Table regularJoin = tableEnv.sqlQuery( "SELECT\n" + " text AS text,\n" + " '中文' AS another_text\n" + "FROM\n" + " test_table\n" + "UNION ALL\n" + "SELECT\n" + " 'another_text' AS text,\n" + " '中文' AS another_text\n" + "FROM\n" + " test_table"); DataStream<Row> appendStream = tableEnv.toAppendStream(regularJoin, Row.class); appendStream.print(); env.execute("test-union-all"); } } *Error Message:* Exception in thread "main" java.lang.OutOfMemoryError: Java heap space at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) at java.lang.StringBuilder.append(StringBuilder.java:136) at scala.StringContext.standardInterpolator(StringContext.scala:126) at scala.StringContext.s(StringContext.scala:95) at org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableStringConstants(CodeGeneratorContext.scala:716) at org.apache.flink.table.planner.codegen.GenerateUtils$.generateLiteral(GenerateUtils.scala:357) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLiteral(ExprCodeGenerator.scala:392) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitLiteral(ExprCodeGenerator.scala:51) at org.apache.calcite.rex.RexLiteral.accept(RexLiteral.java:1137) at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$8.apply(ExprCodeGenerator.scala:448) at org.apache.flink.table.planner.codegen.ExprCodeGenerator$$anonfun$8.apply(ExprCodeGenerator.scala:439) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:439) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51) at org.apache.calcite.rex.RexCall.accept(RexCall.java:191) at org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131) at org.apache.flink.table.planner.codegen.ExpressionReducer$$anonfun$4.apply(ExpressionReducer.scala:84) at org.apache.flink.table.planner.codegen.ExpressionReducer$$anonfun$4.apply(ExpressionReducer.scala:84) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:84) *Code in Flink 1.11.2 with some new syntax:* import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; import static org.apache.flink.table.api.Expressions.$; public class StreamingJob { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); EnvironmentSettings mySetting = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, mySetting); env.setParallelism(1); DataStream<String> oriStream = env.fromElements("test", "union all"); Table testTable = tableEnv.fromDataStream(oriStream, $("text")); tableEnv.createTemporaryView("test_table", testTable); Table regularJoin = tableEnv.sqlQuery( "SELECT\n" + " text AS text,\n" + " '中文' AS another_text\n" + "FROM\n" + " test_table\n" + "UNION ALL\n" + "SELECT\n" + " 'another_text' AS text,\n" + " '中文' AS another_text\n" + "FROM\n" + " test_table"); DataStream<Row> appendStream = tableEnv.toAppendStream(regularJoin, Row.class); appendStream.print(); env.execute("test-union-all"); } } ----- Best wishes. Smile -- Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Best wishes.
Smile |
Free forum by Nabble | Edit this page |