Jingsong Lee created FLINK-11974:
------------------------------------
Summary: Introduce StreamOperatorSubstitutor to help table perform the whole Operator CodeGen
Key: FLINK-11974
URL:
https://issues.apache.org/jira/browse/FLINK-11974 Project: Flink
Issue Type: New Feature
Components: Runtime / Operators
Reporter: Jingsong Lee
Assignee: Jingsong Lee
If we need CodeGen an entire Operator, one possible solution is to introduce an OperatorWrapper, then generate a CodeGen sub-Operator in OperatorWrapper's open, and then proxy all methods to the sub-Operator.
Doing so results in multiple virtual function calls, so we introduce a StreamOperatorSubstitutor:
{code:java}
/**
* Basic interface for stream operator substitutes. It is transferred to the streamTask by
* serialization, and produce an actual stream operator to the streamTask, who uses the actual
* stream operator to run.
*
* @param <OUT> output type of the actual stream operator
*/
public interface StreamOperatorSubstitutor<OUT> {
/**
* Produces the actual stream operator.
*
* @param userCodeClassLoader the user code class loader to use.
* @return the actual stream operator created on {@code StreamTask}.
*/
StreamOperator<OUT> getActualStreamOperator(ClassLoader userCodeClassLoader);
}
{code}
In StreamConfig.getStreamOperator, we need:
{code:java}
if (operator != null && operator instanceof StreamOperatorSubstitutor) {
return (T) ((StreamOperatorSubstitutor) operator).getActualStreamOperator(cl);
} else {
return (T) operator;
}
{code}
to get the real operator.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)