Hello, I’m having trouble finding a way to add logic to an existing SinkFunction. What I would like to do is wrap an input SinkFunction inside another one that will perform its logic and then perform some additional logic, e.g.: SinkFunction<TOUT>
wrapFunction(SinkFunction<TOUT>
function) { This is not clean since if the input function is actually a RichSinkFunction (or another extension that adds other functionalities) I’ll lose that functionalities because my wrapper is not exposing
them (if I take a RichSinkFunction as parameter I lose the open() and close() methods since my wrapper is not exposing them). One thing that I could do is to define different wrappers, one for each type extending SinkFunction, which will then expose all the methods implemented invoking the underlying wrapped function relative
method, but I don’t see this as a good solution. Is there a way to add my custom logic to a function keeping all its features? Thanks in advance
|
I think any solution here is inherently fragile since future versions of
flink can have different abstract classes or interfaces you won't know it has to support. But for a given release, something you can consider is a wrapper class that extends/implements the ones you support. Then, during the method invocation, check delegate instanceof ClassWhereMethodComesFrom and no-op if this is false. > |
In reply to this post by Lorenzo Pirazzini
Could you maybe outline how you want to extend the wrapped sink
functionality? A better approach might be to add an operation "in front" of the sink. Best, Aljoscha On 08.10.20 11:32, Lorenzo Pirazzini wrote: > Hello, I'm having trouble finding a way to add logic to an existing SinkFunction. > What I would like to do is wrap an input SinkFunction inside another one that will perform its logic and then perform some additional logic, e.g.: > > SinkFunction<TOUT> wrapFunction(SinkFunction<TOUT> function) { > > return new SinkFunction<TOUT>() { > @Override > public void invoke(TOUT value, Context context) throws Exception { > function.invoke(value, context); > //additional logic > } > }; > } > > This is not clean since if the input function is actually a RichSinkFunction (or another extension that adds other functionalities) I'll lose that functionalities because my wrapper is not exposing them (if I take a RichSinkFunction as parameter I lose the open() and close() methods since my wrapper is not exposing them). > One thing that I could do is to define different wrappers, one for each type extending SinkFunction, which will then expose all the methods implemented invoking the underlying wrapped function relative method, but I don't see this as a good solution. > Is there a way to add my custom logic to a function keeping all its features? > > Thanks in advance > > > [cid:image003.jpg@01D69D66.AA3E6850] > Lorenzo Pirazzini > Big Data Engineer > E-mail: [hidden email]<mailto:[hidden email]> > Web Site: www.agilelab.it<http://www.agilelab.it> > > > > > |
Free forum by Nabble | Edit this page |