[jira] [Created] (FLINK-19161) Port File Sources to FLIP-27 API

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-19161) Port File Sources to FLIP-27 API

Shang Yuanchun (Jira)
Stephan Ewen created FLINK-19161:
------------------------------------

             Summary: Port File Sources to FLIP-27 API
                 Key: FLINK-19161
                 URL: https://issues.apache.org/jira/browse/FLINK-19161
             Project: Flink
          Issue Type: Sub-task
          Components: Connectors / FileSystem
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen
             Fix For: 1.12.0


Porting the File sources to the FLIP-27 API means combining the
  - FileInputFormat from the DataSet Batch API
  - The Monitoring File Source from the DataStream API.

The two currently share the same reader code already and partial enumeration code.

*Structure*

The new File Source will have three components:

  - File enumerators that discover the files.
  - File split assigners that decide which reader gets what split
  - File Reader Formats, which deal with the decoding.


The main difference between the Bounded (Batch) version and the unbounded (Streaming) version is that the streaming version repeatedly invokes the file enumerator to search for new files.

*Checkpointing Enumerators*

The enumerators need to checkpoint the not-yet-assigned splits, plus, if they are in continuous discovery mode (streaming) the paths / timestamps already processed.

*Checkpointing Readers*

The new File Source needs to ensure that every reader can be checkpointed.
Some readers may be able to expose the position in the input file that corresponds to the latest emitted record, but many will not be able to do that due to
  - storing compresses record batches
  - using buffered decoders where exact position information is not accessible

We therefore suggest to expose a mechanism that combines seekable file offsets and records to read and skip after that offset. In the extreme cases, files can work only with seekable positions or only with records-to-skip. Some sources, like Avro, can have periodic seek points (sync markers) and count records-to-skip after these markers.

*Efficient and Convenient Readers*

To balance efficiency (batch vectorized reading of ORC / Parquet for vectorized query processing) and convenience (plug in 3-rd party CSV decoder over stream) we offer three abstraction for record readers

  - Bulk Formats that run over a file Path and return a iterable batch at a time _(most efficient)_

  - File Record formats which read files record-by-record. The source framework hands over a pre-defined-size batch from Split Reader to Record Emitter.

  - Stream Formats that decode an input stream and rely on the source framework to decide how to batch record handover _(most convenient)_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)