dong created FLINK-12114:
---------------------------- Summary: Try to perform UDF By Reflection Key: FLINK-12114 URL: https://issues.apache.org/jira/browse/FLINK-12114 Project: Flink Issue Type: Improvement Components: Table SQL / API Affects Versions: 1.7.2 Reporter: dong Hi Team: I recently worked on a Flink SQL-based project and then one of the requirements was to dynamically execute the Flink SQL UDF by loading a user-defined UDF. I first used cglib to dynamically add the eval method to the ScalarFunction and then dynamically create the ScalarFunction instance. And call the user-defined UDF by reflection on the line. this my code {code:java} package com.ximalaya.flink.dsl.stream.udf import java.lang.reflect.Method import com.ximalaya.flink.dsl.stream.`type`.FieldType import com.ximalaya.flink.dsl.stream.api.udf.\{AbstractUserUdf, UserUdfContext} import net.sf.cglib.core.Signature import net.sf.cglib.proxy.\{Enhancer, InterfaceMaker, MethodInterceptor, MethodProxy} import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.table.shaded.org.apache.commons.lang.ArrayUtils import net.sf.cglib.asm.Type import scala.collection.mutable.\{Map ⇒ IMap} /** * * @author martin.dong * **/ private class UserUdfFunction extends ScalarFunction{ override def isDeterministic: Boolean = false } private class UdfMethodInterceptor(val name:String, val fullyQualifiedClassName:String) extends MethodInterceptor with Serializable { private var userUdf: AbstractUserUdf = _ private var evalMethods:IMap[MethodSignature,Method]=IMap() private var closeMethod:Method = _ private var openMethod:Method = _ override def intercept(o: scala.Any, method: Method, objects: Array[AnyRef], methodProxy: MethodProxy): AnyRef = { val methodName=method.getName methodName match { case "open"⇒ this.userUdf = Class.forName(fullyQualifiedClassName).newInstance().asInstanceOf[AbstractUserUdf] this.userUdf.getClass.getDeclaredMethods.filter(_.getName=="eval"). foreach(method ⇒ evalMethods.put(MethodSignature.createMethodSignature(method), method)) this.closeMethod = classOf[AbstractUserUdf].getDeclaredMethod("close") this.openMethod = classOf[AbstractUserUdf].getDeclaredMethod("open",classOf[UserUdfContext]) openMethod.invoke(userUdf,null) case "eval"⇒ val methodSignature = MethodSignature.createMethodSignature(method) evalMethods(methodSignature).invoke(userUdf,objects:_*) case "close"⇒ closeMethod.invoke(userUdf) case _⇒ methodProxy.invokeSuper(o,objects) } } } private class MethodSignature (val fieldTypes:Array[FieldType]){ def this(clazzArray:Array[Class[_]]){ this(clazzArray.map(clazz⇒FieldType.get(clazz))) } override def hashCode(): Int = fieldTypes.map(_.hashCode()).sum override def equals(obj: scala.Any): Boolean = { if(this.eq(obj.asInstanceOf[AnyRef])){ return true } obj match { case _: MethodSignature⇒ ArrayUtils.isEquals(this.fieldTypes,obj.asInstanceOf[MethodSignature].fieldTypes) case _ ⇒ false } } override def toString: String =fieldTypes.map(_.toString).mkString(",") } private object MethodSignature{ def createMethodSignature(method:Method):MethodSignature={ new MethodSignature(method.getParameterTypes) } } case class EvalMethod(returnType:FieldType,parameters:Array[FieldType],exceptions:List[Class[Throwable]]) object UserUdfFactory { def createUserUdf(name:String,fullyQualifiedClassName:String,evalMethods:List[EvalMethod]):ScalarFunction={ val enhancer = new Enhancer enhancer.setSuperclass(classOf[UserUdfFunction]) enhancer.setCallback(new UdfMethodInterceptor(name,fullyQualifiedClassName)) enhancer.setInterfaces(evalMethods.map(method⇒{ val returnType=Type.getType(method.returnType.getClazz) val parameters=method.parameters.map(p⇒Type.getType(p.getClazz)) (new Signature("eval",returnType,parameters),method.exceptions) }).map{ case(signature,exceptions)⇒ val im = new InterfaceMaker im.add(signature,exceptions.map(exception⇒Type.getType(exception)).toArray) im.create() }.toArray) enhancer.create().asInstanceOf[ScalarFunction] } } {code} Can be executed in local mode but cannot be executed in yarn mode, the following error will occur {code:java} Caused by: org.codehaus.commons.compiler.CompileException: Line 5, Column 10: Cannot determine simple type name "com" at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:11672) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6416) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6177) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6190) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6156) at org.codehaus.janino.UnitCompiler.access$13300(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6064) at org.codehaus.janino.UnitCompiler$18$1.visitReferenceType(UnitCompiler.java:6059) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3754) at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6059) at org.codehaus.janino.UnitCompiler$18.visitType(UnitCompiler.java:6052) at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3753) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at org.codehaus.janino.UnitCompiler.access$1200(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$21.getType(UnitCompiler.java:7844) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6456) at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6082) at org.codehaus.janino.UnitCompiler$18$2$1.visitFieldAccess(UnitCompiler.java:6077) at org.codehaus.janino.Java$FieldAccess.accept(Java.java:4136) at org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942) at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6052) at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6438) at org.codehaus.janino.UnitCompiler.access$13600(UnitCompiler.java:212) at org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6080) at org.codehaus.janino.UnitCompiler$18$2$1.visitAmbiguousName(UnitCompiler.java:6077) at org.codehaus.janino.Java$AmbiguousName.accept(Java.java:4050) at org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6077) at org.codehaus.janino.UnitCompiler$18$2.visitLvalue(UnitCompiler.java:6073) at org.codehaus.janino.Java$Lvalue.accept(Java.java:3974) at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6073) at org.codehaus.janino.UnitCompiler$18.visitRvalue(UnitCompiler.java:6052) at org.codehaus.janino.Java$Rvalue.accept(Java.java:3942) {code} anyone can help me? -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |