Stefano Bortoli created FLINK-7339:
--------------------------------------
Summary: aggregationToString fails when user defined aggregation contains constants
Key: FLINK-7339
URL:
https://issues.apache.org/jira/browse/FLINK-7339 Project: Flink
Issue Type: Bug
Components: DataStream API, Table API & SQL
Affects Versions: 1.3.1
Reporter: Stefano Bortoli
Issue related to FLINK-7338, when the user defined aggregation contains a constant it breaks the aggregation translation to string, which are mapped 1 to 1 to the input fields.
OverAggregates.scala aggregationToString function fails to find a parameter of the function among the input fields, and therefore throws a RuntimeException.
{code}
private[flink] def aggregationToString(
inputType: RelDataType,
rowType: RelDataType,
namedAggregates: Seq[CalcitePair[AggregateCall, String]]): String = {
val inFields = inputType.getFieldNames.asScala
val outFields = rowType.getFieldNames.asScala
val aggStrings = namedAggregates.map(_.getKey).map(
a => s"${a.getAggregation}(${
if (a.getArgList.size() > 0) {
// ERROR HAPPENS HERE!
a.getArgList.asScala.map(inFields(_)).mkString(", ")
} else {
"*"
}
})")
(inFields ++ aggStrings).zip(outFields).map {
case (f, o) => if (f == o) {
f
} else {
s"$f AS $o"
}
}.mkString(", ")
}
{code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)