Wrapping a Flink Function

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Wrapping a Flink Function

Lorenzo Pirazzini

Hello, I’m having trouble finding a way to add logic to an existing SinkFunction.

What I would like to do is wrap an input SinkFunction inside another one that will perform its logic and then perform some additional logic, e.g.:

 

SinkFunction<TOUT> wrapFunction(SinkFunction<TOUT> function) {

 
return new SinkFunction<TOUT>() {
   
@Override
   
public void invoke(TOUT value, Context context) throws Exception {
     
function.invoke(value, context);
     
//additional logic
   
}
  };
}

 

This is not clean since if the input function is actually a RichSinkFunction (or another extension that adds other functionalities) I’ll lose that functionalities because my wrapper is not exposing them (if I take a RichSinkFunction as parameter I lose the open() and close() methods since my wrapper is not exposing them).

One thing that I could do is to define different wrappers, one for each type extending SinkFunction, which will then expose all the methods implemented invoking the underlying wrapped function relative method, but I don’t see this as a good solution.

Is there a way to add my custom logic to a function keeping all its features?

 

Thanks in advance

 

 

Lorenzo Pirazzini

Big Data Engineer

E-mail: [hidden email]

Web Site: www.agilelab.it

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Wrapping a Flink Function

Teng Fei Liao
I think any solution here is inherently fragile since future versions of
flink can have different abstract classes or interfaces you won't know it
has to support. But for a given release, something you can consider is a
wrapper class that extends/implements the ones you support. Then, during
the method invocation, check delegate instanceof ClassWhereMethodComesFrom
and no-op if this is false.

>
Reply | Threaded
Open this post in threaded view
|

Re: Wrapping a Flink Function

Aljoscha Krettek-2
In reply to this post by Lorenzo Pirazzini
Could you maybe outline how you want to extend the wrapped sink
functionality? A better approach might be to add an operation "in front"
of the sink.

Best,
Aljoscha

On 08.10.20 11:32, Lorenzo Pirazzini wrote:

> Hello, I'm having trouble finding a way to add logic to an existing SinkFunction.
> What I would like to do is wrap an input SinkFunction inside another one that will perform its logic and then perform some additional logic, e.g.:
>
> SinkFunction<TOUT> wrapFunction(SinkFunction<TOUT> function) {
>
>    return new SinkFunction<TOUT>() {
>      @Override
>      public void invoke(TOUT value, Context context) throws Exception {
>        function.invoke(value, context);
>        //additional logic
>      }
>    };
> }
>
> This is not clean since if the input function is actually a RichSinkFunction (or another extension that adds other functionalities) I'll lose that functionalities because my wrapper is not exposing them (if I take a RichSinkFunction as parameter I lose the open() and close() methods since my wrapper is not exposing them).
> One thing that I could do is to define different wrappers, one for each type extending SinkFunction, which will then expose all the methods implemented invoking the underlying wrapped function relative method, but I don't see this as a good solution.
> Is there a way to add my custom logic to a function keeping all its features?
>
> Thanks in advance
>
>
> [cid:image003.jpg@01D69D66.AA3E6850]
> Lorenzo Pirazzini
> Big Data Engineer
> E-mail: [hidden email]<mailto:[hidden email]>
> Web Site: www.agilelab.it<http://www.agilelab.it>
>
>
>
>
>