[jira] [Created] (FLINK-17899) Integrate FLIP-126 Timestamps and Watermarking with FLIP-27 sources

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-17899) Integrate FLIP-126 Timestamps and Watermarking with FLIP-27 sources

Shang Yuanchun (Jira)
Stephan Ewen created FLINK-17899:
------------------------------------

             Summary: Integrate FLIP-126 Timestamps and Watermarking with FLIP-27 sources
                 Key: FLINK-17899
                 URL: https://issues.apache.org/jira/browse/FLINK-17899
             Project: Flink
          Issue Type: Sub-task
          Components: API / DataStream
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen


*Preambel:* This whole discussion is to some extend only necessary, because in the {{SourceReader}}, we pass the {{SourceOutput}} as a parameter to the {{pollNext(...)}} method. However, this design follows some deeper runtime pipeline design, and is not easy to change at this stage.

 

There are some principle design choices here:

 

*(1) Do we make Timestamps and Watermarks purely a feature of the library (ConnectorBase), or do we integrate it with the core (SourceOperator).*

Making it purely a responsibility of the ConnectorBase would have the advantage of keeping the SourceOperator simple. However, there is value in integrating this with the SourceOperator.
 - Implementations that are not using the ConnectorBase (like simple collection- or iterator-based sources) would automatically get access to the plug-able TimestampExtractors and WatermarkGenerators.

 - When executing batch programs, the SourceOperator can transparently inject a "no-op" WatermarkGenerator so make sure no Watermarks are generated during the batch execution. Given that batch sources are very performance sensitive, it seems useful to not even run the watermark generator logic, rather than later dropping the watermarks.

 - In a future version, we may want to implement "global watermark holds" generated my the Enumerators: The enumerator tells the readers how far they may advance their local watermarks. This can help to not prematurely advance the watermark based on a split's records when other splits have data overlapping with older ranges. An example where this is commonly the case is the streaming file source.

 

*(2) Is the per-partition watermarking purely a feature of the library (ConnectorBase), or do we integrate it with the core (SourceOperator).*

I believe we need to solve this on the same level as the previous question:
 - Once a connector instantiates the per-partition watermark generators, the main output (through which the SourceReader emits the records) must not run its watermark generator any more. Otherwise we extract watermarks also on the merged stream, which messes things up. So having the per-partition watermark generators simply in the ConnectorBase and emit transparently through an unchanged main output would not work.

 - So, if we decide to implement watermarks support in the core (SourceOperator), we would need to offer the per-partition watermarking utilities on that level as well.

 - Along a similar line of thoughts as in the previous point, the batch execution can optimize the watermark extraction by supplying no-op extractors also for the per-partition extractors (which will most likely bear the bulk of the load in the connectors).

 

*(3) How would an integration of WatermarkGenerators with the SourceOperator look like?*

Rather straightforward, the SourceOperator instantiates a SourceOutput that internally runs the timestamp extractor and watermark generator and emits to the DataOutput that the operator emits to.

 

*(4) How would an integration of the per-split WatermarkGenerators look like?*

I would propose to add a method to the {{SourceReaderContext}}: {{SplitAwareOutputs createSourceAwareOutputs()}}

The {{SplitAwareOutputs}} looks the following way:
{code:java}
public interface SplitAwareOutputs<T> {

    SourceOutput<T> createOutputForSplit(String splitId);

    void releaseOutputForSplit(String splitId);
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)