Nico Kruber created FLINK-19112:
----------------------------------- Summary: No access to metric group in ScalarFunction when optimizing Key: FLINK-19112 URL: https://issues.apache.org/jira/browse/FLINK-19112 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.11.1 Reporter: Nico Kruber Attachments: MetricsGroupBug.java Under some circumstances, I cannot access {{context.getMetricGroup()}} in a {{ScalarFunction}} like this (full job attached): {code:java} public static class MyUDF extends ScalarFunction { @Override public void open(FunctionContext context) throws Exception { super.open(context); context.getMetricGroup(); } public Integer eval(Integer id) { return id; } } {code} which leads to this exception: {code:java} Exception in thread "main" java.lang.UnsupportedOperationException: getMetricGroup is not supported when optimizing at org.apache.flink.table.planner.codegen.ConstantFunctionContext.getMetricGroup(ExpressionReducer.scala:249) at com.ververica.MetricsGroupBug$MyUDF.open(MetricsGroupBug.java:57) at ExpressionReducer$2.open(Unknown Source) at org.apache.flink.table.planner.codegen.ExpressionReducer.reduce(ExpressionReducer.scala:118) at org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressionsInternal(ReduceExpressionsRule.java:696) at org.apache.calcite.rel.rules.ReduceExpressionsRule.reduceExpressions(ReduceExpressionsRule.java:618) at org.apache.calcite.rel.rules.ReduceExpressionsRule$ProjectReduceExpressionsRule.onMatch(ReduceExpressionsRule.java:303) at org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:328) at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:562) at org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:427) at org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:264) at org.apache.calcite.plan.hep.HepInstruction$RuleInstance.execute(HepInstruction.java:127) at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:223) at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:210) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepProgram.optimize(FlinkHepProgram.scala:69) at org.apache.flink.table.planner.plan.optimize.program.FlinkHepRuleSetProgram.optimize(FlinkHepRuleSetProgram.scala:87) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.$anonfun$optimize$1(FlinkChainedProgram.scala:62) at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:156) at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:156) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:156) at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:154) at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram.optimize(FlinkChainedProgram.scala:58) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.optimizeTree(StreamCommonSubGraphBasedOptimizer.scala:164) at org.apache.flink.table.planner.plan.optimize.StreamCommonSubGraphBasedOptimizer.doOptimize(StreamCommonSubGraphBasedOptimizer.scala:84) at org.apache.flink.table.planner.plan.optimize.CommonSubGraphBasedOptimizer.optimize(CommonSubGraphBasedOptimizer.scala:77) at org.apache.flink.table.planner.delegation.PlannerBase.optimize(PlannerBase.scala:279) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:164) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700) at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:565) at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:549) at com.ververica.MetricsGroupBug.main(MetricsGroupBug.java:50) {code} I also tried to work around this with a try-catch, assuming that this method is called once during optimisation and another time at runtime. However, it seems as if {{open()}} is actually only called once (during optimization) thus giving me no choice to access the metrics group. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |