At least I hope it has been fixed. Which version and planner are you using?
On 11.12.19 11:47, Arujit Pradhan wrote: > Hi Timo, > > Thanks for the bug reference. > > You mentioned that this bug has been fixed. Is the fix available for > flink 1.9+ and default query planner. > > Thanks and regards, > /Arujit/ > > On Wed, Dec 11, 2019 at 3:56 PM Timo Walther <[hidden email] > <mailto:[hidden email]>> wrote: > > I remember that we fixed some bug around this topic recently. The > legacy > planner should not be affected. > > There is another user reporting this: > https://issues.apache.org/jira/browse/FLINK-15040 > > Regards, > Timo > > On 11.12.19 10:34, Dawid Wysakowicz wrote: > > Hi Arujit, > > > > Could you also share the query where you use this UDF? It would also > > help if you said which version of Flink you are using and which > planner. > > > > Best, > > > > Dawid > > > > On 11/12/2019 10:21, Arujit Pradhan wrote: > >> Hi all, > >> > >> So we are creating some User Defined Functions of type > >> AggregateFunction. And we want to send some static metrics from the > >> *open()* method of the UDFs as we can get *MetricGroup *by > >> *FunctionContext *which is only exposed in the open method. Our > code > >> looks something like this(Which is an implementation of count > distinct > >> in SQL) : > >> > >> public class DistinctCount extends AggregateFunction<Integer, > >> DistinctCountAccumulator> { @Override public > DistinctCountAccumulator > >> createAccumulator() { return new DistinctCountAccumulator(); } > >> @Override public void open(FunctionContext context) throws > Exception { super.open(context); MetricGroup metricGroup = > context.getMetricGroup(); // add some metric to the group here > >> System.out.println("in the open of UDF"); } @Override public void > >> close() throws Exception { super.close(); } @Override public > Integer > >> getValue(DistinctCountAccumulator distinctCountAccumulator) { > System.out.println("in the udf"); return > distinctCountAccumulator.count(); } public void > accumulate(DistinctCountAccumulator distinctCountAccumulator, String > item) { if (item== null) { return; } > distinctCountAccumulator.add(item); } } > >> > >> But when we use this UDF in FlinkSQL, it seems like the open > method is > >> not being called at all. > >> > >> From the filnk UDF documentation we find : > >> > >> *The |open()| method is called once before the evaluation > method. The > >> |close()| method after the last call to the evaluation method.* > >> > >> *The |open()| method provides a |FunctionContext| that contains > >> information about the context in which user-defined functions are > >> executed, such as the metric group, the distributed cache files, or > >> the global job parameters.* > >> > >> Then is there any reason that open is not working in > >> AggragateFunctions. Btw it works fine in case of > ScalarFunctions. Is > >> there any alternative scope where we can register some static > metrics > >> in a UDF. > >> > >> > >> Thanks and regards, > >> /Arujit/ > >> > |
Free forum by Nabble | Edit this page |