Re: Open Method is not being called in case of AggregateFunction UDFs

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re: Open Method is not being called in case of AggregateFunction UDFs

Timo Walther-2
At least I hope it has been fixed. Which version and planner are you using?


On 11.12.19 11:47, Arujit Pradhan wrote:

> Hi Timo,
>
> Thanks for the bug reference.
>
> You mentioned that this bug has been fixed. Is the fix available for
> flink 1.9+ and default query planner.
>
> Thanks and regards,
> /Arujit/
>
> On Wed, Dec 11, 2019 at 3:56 PM Timo Walther <[hidden email]
> <mailto:[hidden email]>> wrote:
>
>     I remember that we fixed some bug around this topic recently. The
>     legacy
>     planner should not be affected.
>
>     There is another user reporting this:
>     https://issues.apache.org/jira/browse/FLINK-15040
>
>     Regards,
>     Timo
>
>     On 11.12.19 10:34, Dawid Wysakowicz wrote:
>      > Hi Arujit,
>      >
>      > Could you also share the query where you use this UDF? It would also
>      > help if you said which version of Flink you are using and which
>     planner.
>      >
>      > Best,
>      >
>      > Dawid
>      >
>      > On 11/12/2019 10:21, Arujit Pradhan wrote:
>      >> Hi all,
>      >>
>      >> So we are creating some User Defined Functions of type
>      >> AggregateFunction. And we want to send some static metrics from the
>      >> *open()* method of the UDFs as we can get *MetricGroup *by
>      >> *FunctionContext *which is only exposed in the open method. Our
>     code
>      >> looks something like this(Which is an implementation of count
>     distinct
>      >> in SQL) :
>      >>
>      >> public class DistinctCount extends AggregateFunction<Integer,
>      >> DistinctCountAccumulator> { @Override public
>     DistinctCountAccumulator
>      >> createAccumulator() { return new DistinctCountAccumulator(); }
>      >> @Override public void open(FunctionContext context) throws
>     Exception { super.open(context); MetricGroup metricGroup =
>     context.getMetricGroup(); // add some metric to the group here
>      >> System.out.println("in the open of UDF"); } @Override public void
>      >> close() throws Exception { super.close(); } @Override public
>     Integer
>      >> getValue(DistinctCountAccumulator distinctCountAccumulator) {
>     System.out.println("in the udf"); return
>     distinctCountAccumulator.count(); } public void
>     accumulate(DistinctCountAccumulator distinctCountAccumulator, String
>     item) { if (item== null) { return; }
>     distinctCountAccumulator.add(item); } }
>      >>
>      >> But when we use this UDF in FlinkSQL, it seems like the open
>     method is
>      >> not being called at all.
>      >>
>      >> From the filnk UDF documentation we find :
>      >>
>      >> *The |open()| method is called once before the evaluation
>     method. The
>      >> |close()| method after the last call to the evaluation method.*
>      >>
>      >> *The |open()| method provides a |FunctionContext| that contains
>      >> information about the context in which user-defined functions are
>      >> executed, such as the metric group, the distributed cache files, or
>      >> the global job parameters.*
>      >>
>      >> Then is there any reason that open is not working in
>      >> AggragateFunctions. Btw it works fine in case of
>     ScalarFunctions. Is
>      >> there any alternative scope where we can register some static
>     metrics
>      >> in a UDF.
>      >>
>      >>
>      >> Thanks and regards,
>      >> /Arujit/
>      >>
>