Flink Custom SourceFunction and SinkFunction

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Custom SourceFunction and SinkFunction

Siew Wai Yow
Hi guys,

I have question regarding to the title that need your expertise,


  1.  I need to build a SFTP SourceFunction, may I know if hadoop SFTPFileSystem suitable?
  2.  I need to build a SFTP SinkFunction as well, may I know if per-defined HDFS rolling file sink accept SFTP connection since SFTP is supported by hadoop file system?
  3.  Any good reference on how to write custom source/sink?
  4.  Any similar code to share?

Thanks!

Regards,
Yow
Reply | Threaded
Open this post in threaded view
|

Re: Flink Custom SourceFunction and SinkFunction

Piotr Nowojski-3
Hi,

I couldn’t find any references to your question neither I haven’t seen such use case, but:

Re 1.
It looks like it could work

Re 2.
It should work as well, but just try to use StreamingFileSink

Re 3.
For custom source/sink function, if you do not care data processing guarantees it’s quite easy. If you have to achieve at-least-once or exactly-once things might get more complicated.
For exactly-once sink, you should start from `TwoPhaseCommitSinkFunction`. (Example usages check test class `TwoPhaseCommitSinkFunctionTest.ContentDumpSinkFunction`, or more complicated FlinkKafkaProducer)
For at-least-once sink, you can just flush/sync the output files on snapshot/checkpoint.
For source, you would have to manually keep the input offsets on Flink’s state.

Re 4.

Regarding SFTP support: not that I’m aware of.
Regarding sources/sinks you can try to look at existing source/sinks implementations.

Piotrek

> On 1 Mar 2019, at 09:39, Siew Wai Yow <[hidden email]> wrote:
>
> Hi guys,
>
> I have question regarding to the title that need your expertise,
>
> I need to build a SFTP SourceFunction, may I know if hadoop SFTPFileSystem suitable?
> I need to build a SFTP SinkFunction as well, may I know if per-defined HDFS rolling file sink accept SFTP connection since SFTP is supported by hadoop file system?
> Any good reference on how to write custom source/sink?
> Any similar code to share?
> Thanks!
>
> Regards,
> Yow