Hi everyone,
I'd like to start a discussion about FLIP-115 Filesystem connector in Table [1]. This FLIP will bring: - Introduce Filesystem table factory in table, support csv/parquet/orc/json/avro formats. - Introduce streaming filesystem/hive sink in table CC to user mail list, if you have any unmet needs, please feel free to reply~ Look forward to hearing from you. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table Best, Jingsong Lee |
Thanks for FLIP-115. It is really useful feature for platform developers
who manage hundreds of Flink to Hive jobs in production. I think we need add 'connector.sink.username' for UserGroupInformation when data is written to HDFS Jingsong Li <[hidden email]> 于2020年3月13日周五 下午3:33写道: > Hi everyone, > > I'd like to start a discussion about FLIP-115 Filesystem connector in Table > [1]. > This FLIP will bring: > - Introduce Filesystem table factory in table, support > csv/parquet/orc/json/avro formats. > - Introduce streaming filesystem/hive sink in table > > CC to user mail list, if you have any unmet needs, please feel free to > reply~ > > Look forward to hearing from you. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > Best, > Jingsong Lee > |
In reply to this post by Jingsong Li
Thanks for FLIP-115. It is really useful feature for platform developers who manage hundreds of Flink to Hive jobs in production.
I think we need add 'connector.sink.username' for UserGroupInformation when data is written to HDFS 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> 写入: Hi everyone, I'd like to start a discussion about FLIP-115 Filesystem connector in Table [1]. This FLIP will bring: - Introduce Filesystem table factory in table, support csv/parquet/orc/json/avro formats. - Introduce streaming filesystem/hive sink in table CC to user mail list, if you have any unmet needs, please feel free to reply~ Look forward to hearing from you. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table Best, Jingsong Lee |
Hi,
Which actual sinks/sources are you planning to use in this feature? Is it about exposing StreamingFileSink in the Table API? Or do you want to implement new Sinks/Sources? Piotrek > On 13 Mar 2020, at 10:04, jinhai wang <[hidden email]> wrote: > > Thanks for FLIP-115. It is really useful feature for platform developers who manage hundreds of Flink to Hive jobs in production. > I think we need add 'connector.sink.username' for UserGroupInformation when data is written to HDFS > > > 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> 写入: > > Hi everyone, > > I'd like to start a discussion about FLIP-115 Filesystem connector in Table > [1]. > This FLIP will bring: > - Introduce Filesystem table factory in table, support > csv/parquet/orc/json/avro formats. > - Introduce streaming filesystem/hive sink in table > > CC to user mail list, if you have any unmet needs, please feel free to > reply~ > > Look forward to hearing from you. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > Best, > Jingsong Lee > > > |
Hi,
Very thanks for Jinsong to bring up this discussion! It should largely improve the usability after enhancing the FileSystem connector in Table. I have the same question with Piotr. From my side, I think it should be better to be able to reuse existing StreamingFileSink. I think We have began enhancing the supported FileFormat (e.g., ORC, Avro...), and reusing StreamFileSink should be able to avoid repeat work in the Table library. Besides, the bucket concept seems also matches the semantics of partition. For the notification of adding partitions, I'm a little wondering that the Watermark mechanism might not be enough since Bucket/Partition might spans multiple subtasks. It depends on the level of notification: if we want to notify for the bucket on each subtask, using watermark to notifying each subtask should be ok, but if we want to notifying for the whole Bucket/Partition, we might need to also do some coordination between subtasks. Best, Yun ------------------------------------------------------------------ From:Piotr Nowojski <[hidden email]> Send Time:2020 Mar. 13 (Fri.) 18:03 To:dev <[hidden email]> Cc:user <[hidden email]>; user-zh <[hidden email]> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table Hi, Which actual sinks/sources are you planning to use in this feature? Is it about exposing StreamingFileSink in the Table API? Or do you want to implement new Sinks/Sources? Piotrek > On 13 Mar 2020, at 10:04, jinhai wang <[hidden email]> wrote: > > Thanks for FLIP-115. It is really useful feature for platform developers who manage hundreds of Flink to Hive jobs in production. > I think we need add 'connector.sink.username' for UserGroupInformation when data is written to HDFS > > > 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> 写入: > > Hi everyone, > > I'd like to start a discussion about FLIP-115 Filesystem connector in Table > [1]. > This FLIP will bring: > - Introduce Filesystem table factory in table, support > csv/parquet/orc/json/avro formats. > - Introduce streaming filesystem/hive sink in table > > CC to user mail list, if you have any unmet needs, please feel free to > reply~ > > Look forward to hearing from you. > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > Best, > Jingsong Lee > > > |
Thanks Piotr and Yun for involving.
Hi Piotr and Yun, for implementation, FLINK-14254 [1] introduce batch sink table world, it deals with partitions thing, metastore thing and etc.. And it just reuse Dataset/Datastream FileInputFormat and FileOutputFormat. Filesystem can not do without FileInputFormat, because it need deal with file things, split things. Like orc and parquet, they need read whole file and have different split logic. So back to file system connector: - It needs introducing FilesystemTableFactory, FilesystemTableSource and FilesystemTableSink. - For sources, reusing Dataset/Datastream FileInputFormats, there are no other interface to finish file reading. For file sinks: - Batch sink use FLINK-14254 - Streaming sink has two ways. First way is reusing Batch sink in FLINK-14254, It has handled the partition and metastore logic well. - unify batch and streaming - Using FileOutputFormat is consistent with FileInputFormat. - Add exactly-once related logic. Just 200+ lines code. - It's natural to support more table features, like partition commit, auto compact and etc.. Second way is reusing Datastream StreamingFileSink: - unify streaming sink between table and Datastream. - It maybe hard to introduce table related features to StreamingFileSink. I prefer the first way a little. What do you think? Hi Yun, > Watermark mechanism might not be enough. Watermarks of subtasks are the same in the "snapshotState". > we might need to also do some coordination between subtasks. Yes, JobMaster is the role to control subtasks. Metastore is a very fragile single point, which can not be accessed by distributed, so it is uniformly accessed by JobMaster. [1]https://issues.apache.org/jira/browse/FLINK-14254 Best, Jingsong Lee On Fri, Mar 13, 2020 at 6:43 PM Yun Gao <[hidden email]> wrote: > Hi, > > Very thanks for Jinsong to bring up this discussion! It should > largely improve the usability after enhancing the FileSystem connector in > Table. > > I have the same question with Piotr. From my side, I think it > should be better to be able to reuse existing StreamingFileSink. I think We > have began > enhancing the supported FileFormat (e.g., ORC, Avro...), and > reusing StreamFileSink should be able to avoid repeat work in the Table > library. Besides, > the bucket concept seems also matches the semantics of partition. > > For the notification of adding partitions, I'm a little wondering > that the Watermark mechanism might not be enough since Bucket/Partition > might spans > multiple subtasks. It depends on the level of notification: if we > want to notify for the bucket on each subtask, using watermark to notifying > each subtask > should be ok, but if we want to notifying for the whole > Bucket/Partition, we might need to also do some coordination between > subtasks. > > > Best, > Yun > > > > ------------------------------------------------------------------ > From:Piotr Nowojski <[hidden email]> > Send Time:2020 Mar. 13 (Fri.) 18:03 > To:dev <[hidden email]> > Cc:user <[hidden email]>; user-zh <[hidden email]> > Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table > > Hi, > > > Which actual sinks/sources are you planning to use in this feature? Is it about exposing StreamingFileSink in the Table API? Or do you want to implement new Sinks/Sources? > > Piotrek > > > On 13 Mar 2020, at 10:04, jinhai wang <[hidden email]> wrote: > > > > > Thanks for FLIP-115. It is really useful feature for platform developers who manage hundreds of Flink to Hive jobs in production. > > > I think we need add 'connector.sink.username' for UserGroupInformation when data is written to HDFS > > > > > > 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> 写入: > > > > Hi everyone, > > > > > I'd like to start a discussion about FLIP-115 Filesystem connector in Table > > [1]. > > This FLIP will bring: > > - Introduce Filesystem table factory in table, support > > csv/parquet/orc/json/avro formats. > > - Introduce streaming filesystem/hive sink in table > > > > CC to user mail list, if you have any unmet needs, please feel free to > > reply~ > > > > Look forward to hearing from you. > > > > [1] > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > > > Best, > > Jingsong Lee > > > > > > > > > -- Best, Jingsong Lee |
Thanks Jinhai for involving.
> we need add 'connector.sink.username' for UserGroupInformation when data is written to HDFS Yes, I am not an expert of HDFS, but it seems we need do this "doAs" in the code for access external HDFS. I will update document. Best, Jingsong Lee On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li <[hidden email]> wrote: > Thanks Piotr and Yun for involving. > > Hi Piotr and Yun, for implementation, > > FLINK-14254 [1] introduce batch sink table world, it deals with partitions > thing, metastore thing and etc.. And it just reuse Dataset/Datastream > FileInputFormat and FileOutputFormat. Filesystem can not do without > FileInputFormat, because it need deal with file things, split things. Like > orc and parquet, they need read whole file and have different split logic. > > So back to file system connector: > - It needs introducing FilesystemTableFactory, FilesystemTableSource and > FilesystemTableSink. > - For sources, reusing Dataset/Datastream FileInputFormats, there are no > other interface to finish file reading. > > For file sinks: > - Batch sink use FLINK-14254 > - Streaming sink has two ways. > > First way is reusing Batch sink in FLINK-14254, It has handled the > partition and metastore logic well. > - unify batch and streaming > - Using FileOutputFormat is consistent with FileInputFormat. > - Add exactly-once related logic. Just 200+ lines code. > - It's natural to support more table features, like partition commit, auto > compact and etc.. > > Second way is reusing Datastream StreamingFileSink: > - unify streaming sink between table and Datastream. > - It maybe hard to introduce table related features to StreamingFileSink. > > I prefer the first way a little. What do you think? > > Hi Yun, > > > Watermark mechanism might not be enough. > > Watermarks of subtasks are the same in the "snapshotState". > > > we might need to also do some coordination between subtasks. > > Yes, JobMaster is the role to control subtasks. Metastore is a very > fragile single point, which can not be accessed by distributed, so it is > uniformly accessed by JobMaster. > > [1]https://issues.apache.org/jira/browse/FLINK-14254 > > Best, > Jingsong Lee > > On Fri, Mar 13, 2020 at 6:43 PM Yun Gao <[hidden email]> wrote: > >> Hi, >> >> Very thanks for Jinsong to bring up this discussion! It should >> largely improve the usability after enhancing the FileSystem connector in >> Table. >> >> I have the same question with Piotr. From my side, I think it >> should be better to be able to reuse existing StreamingFileSink. I think We >> have began >> enhancing the supported FileFormat (e.g., ORC, Avro...), and >> reusing StreamFileSink should be able to avoid repeat work in the Table >> library. Besides, >> the bucket concept seems also matches the semantics of partition. >> >> For the notification of adding partitions, I'm a little wondering >> that the Watermark mechanism might not be enough since Bucket/Partition >> might spans >> multiple subtasks. It depends on the level of notification: if we >> want to notify for the bucket on each subtask, using watermark to notifying >> each subtask >> should be ok, but if we want to notifying for the whole >> Bucket/Partition, we might need to also do some coordination between >> subtasks. >> >> >> Best, >> Yun >> >> >> >> ------------------------------------------------------------------ >> From:Piotr Nowojski <[hidden email]> >> Send Time:2020 Mar. 13 (Fri.) 18:03 >> To:dev <[hidden email]> >> Cc:user <[hidden email]>; user-zh <[hidden email]> >> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table >> >> Hi, >> >> >> Which actual sinks/sources are you planning to use in this feature? Is it about exposing StreamingFileSink in the Table API? Or do you want to implement new Sinks/Sources? >> >> Piotrek >> >> > On 13 Mar 2020, at 10:04, jinhai wang <[hidden email]> wrote: >> > >> >> > Thanks for FLIP-115. It is really useful feature for platform developers who manage hundreds of Flink to Hive jobs in production. >> >> > I think we need add 'connector.sink.username' for UserGroupInformation when data is written to HDFS >> > >> > >> > 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> 写入: >> > >> > Hi everyone, >> > >> >> > I'd like to start a discussion about FLIP-115 Filesystem connector in Table >> > [1]. >> > This FLIP will bring: >> > - Introduce Filesystem table factory in table, support >> > csv/parquet/orc/json/avro formats. >> > - Introduce streaming filesystem/hive sink in table >> > >> >> > CC to user mail list, if you have any unmet needs, please feel free to >> > reply~ >> > >> > Look forward to hearing from you. >> > >> > [1] >> > >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table >> > >> > Best, >> > Jingsong Lee >> > >> > >> > >> >> >> > > -- > Best, Jingsong Lee > -- Best, Jingsong Lee |
Hi Jingsong,
> First way is reusing Batch sink in FLINK-14254, It has handled the partition and metastore logic well. > - unify batch and streaming > - Using FileOutputFormat is consistent with FileInputFormat. > - Add exactly-once related logic. Just 200+ lines code. > - It's natural to support more table features, like partition commit, auto compact and etc.. > > Second way is reusing Datastream StreamingFileSink: > - unify streaming sink between table and Datastream. > - It maybe hard to introduce table related features to StreamingFileSink. > > I prefer the first way a little. What do you think? I would be surprised if adding “exactly-once related logic” is just 200 lines of code. There are things like multi part file upload to s3 and there are also some pending features like [1]. I would suggest to ask/involve Klou in this discussion. If it’s as easy to support exactly-once streaming with current batch sink, that begs the question, why do we need to maintain StreamingFileSink? The worst possible outcome from my perspective will be, if we have another example of an operator/logic implemented independently both in DataStream API and Table API. Because I’m pretty sure they will not be fully compatible, each with it’s own set of limitations, quirks and features. Especially that we have on our long term roadmap and wish list to unify such kind of operators. Piotrek [1] https://issues.apache.org/jira/browse/FLINK-11499 <https://issues.apache.org/jira/browse/FLINK-11499> > On 16 Mar 2020, at 06:55, Jingsong Li <[hidden email]> wrote: > > Thanks Jinhai for involving. > >> we need add 'connector.sink.username' for UserGroupInformation when data > is written to HDFS > > Yes, I am not an expert of HDFS, but it seems we need do this "doAs" in the > code for access external HDFS. I will update document. > > Best, > Jingsong Lee > > On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li <[hidden email]> wrote: > >> Thanks Piotr and Yun for involving. >> >> Hi Piotr and Yun, for implementation, >> >> FLINK-14254 [1] introduce batch sink table world, it deals with partitions >> thing, metastore thing and etc.. And it just reuse Dataset/Datastream >> FileInputFormat and FileOutputFormat. Filesystem can not do without >> FileInputFormat, because it need deal with file things, split things. Like >> orc and parquet, they need read whole file and have different split logic. >> >> So back to file system connector: >> - It needs introducing FilesystemTableFactory, FilesystemTableSource and >> FilesystemTableSink. >> - For sources, reusing Dataset/Datastream FileInputFormats, there are no >> other interface to finish file reading. >> >> For file sinks: >> - Batch sink use FLINK-14254 >> - Streaming sink has two ways. >> >> First way is reusing Batch sink in FLINK-14254, It has handled the >> partition and metastore logic well. >> - unify batch and streaming >> - Using FileOutputFormat is consistent with FileInputFormat. >> - Add exactly-once related logic. Just 200+ lines code. >> - It's natural to support more table features, like partition commit, auto >> compact and etc.. >> >> Second way is reusing Datastream StreamingFileSink: >> - unify streaming sink between table and Datastream. >> - It maybe hard to introduce table related features to StreamingFileSink. >> >> I prefer the first way a little. What do you think? >> >> Hi Yun, >> >>> Watermark mechanism might not be enough. >> >> Watermarks of subtasks are the same in the "snapshotState". >> >>> we might need to also do some coordination between subtasks. >> >> Yes, JobMaster is the role to control subtasks. Metastore is a very >> fragile single point, which can not be accessed by distributed, so it is >> uniformly accessed by JobMaster. >> >> [1]https://issues.apache.org/jira/browse/FLINK-14254 >> >> Best, >> Jingsong Lee >> >> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao <[hidden email]> wrote: >> >>> Hi, >>> >>> Very thanks for Jinsong to bring up this discussion! It should >>> largely improve the usability after enhancing the FileSystem connector in >>> Table. >>> >>> I have the same question with Piotr. From my side, I think it >>> should be better to be able to reuse existing StreamingFileSink. I think We >>> have began >>> enhancing the supported FileFormat (e.g., ORC, Avro...), and >>> reusing StreamFileSink should be able to avoid repeat work in the Table >>> library. Besides, >>> the bucket concept seems also matches the semantics of partition. >>> >>> For the notification of adding partitions, I'm a little wondering >>> that the Watermark mechanism might not be enough since Bucket/Partition >>> might spans >>> multiple subtasks. It depends on the level of notification: if we >>> want to notify for the bucket on each subtask, using watermark to notifying >>> each subtask >>> should be ok, but if we want to notifying for the whole >>> Bucket/Partition, we might need to also do some coordination between >>> subtasks. >>> >>> >>> Best, >>> Yun >>> >>> >>> >>> ------------------------------------------------------------------ >>> From:Piotr Nowojski <[hidden email]> >>> Send Time:2020 Mar. 13 (Fri.) 18:03 >>> To:dev <[hidden email]> >>> Cc:user <[hidden email]>; user-zh <[hidden email]> >>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table >>> >>> Hi, >>> >>> >>> Which actual sinks/sources are you planning to use in this feature? Is it about exposing StreamingFileSink in the Table API? Or do you want to implement new Sinks/Sources? >>> >>> Piotrek >>> >>>> On 13 Mar 2020, at 10:04, jinhai wang <[hidden email]> wrote: >>>> >>> >>>> Thanks for FLIP-115. It is really useful feature for platform developers who manage hundreds of Flink to Hive jobs in production. >>> >>>> I think we need add 'connector.sink.username' for UserGroupInformation when data is written to HDFS >>>> >>>> >>>> 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> 写入: >>>> >>>> Hi everyone, >>>> >>> >>>> I'd like to start a discussion about FLIP-115 Filesystem connector in Table >>>> [1]. >>>> This FLIP will bring: >>>> - Introduce Filesystem table factory in table, support >>>> csv/parquet/orc/json/avro formats. >>>> - Introduce streaming filesystem/hive sink in table >>>> >>> >>>> CC to user mail list, if you have any unmet needs, please feel free to >>>> reply~ >>>> >>>> Look forward to hearing from you. >>>> >>>> [1] >>>> >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table >>>> >>>> Best, >>>> Jingsong Lee >>>> >>>> >>>> >>> >>> >>> >> >> -- >> Best, Jingsong Lee >> > > > -- > Best, Jingsong Lee |
Hi Piotr,
I am very entangled. Let me re-list the table streaming sink requirements: - In table, maybe 90% sinks are for Hive. The parquet and orc are the most important formats. Hive provide RecordWriters, it is easy to support all hive formats by using it, and we don't need concern hive version compatibility too, but it can not work with FSDataOutputStream. - Hive table maybe use external HDFS. It means, hive has its own hadoop configuration. - In table, partition commit is needed, we can not just move files, it is important to complete table semantics to update catalog. You are right DataStream and Table streaming sink will not be fully compatible, each with its own set of limitations, quirks and features. But if re-using DataStream, batch and streaming also will not be fully compatible. Provide a unify experience to batch and streaming is also important. Table and DataStream have different concerns, and they tilt in different directions. Of course, it is very good to see a unify implementation to solve batch sink and hive things, unify DataStream batch sink and DataStream streaming sink and Table batch sink and Table streaming sink. Le's see what others think. Best, Jingsong Lee On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski <[hidden email]> wrote: > Hi Jingsong, > > > First way is reusing Batch sink in FLINK-14254, It has handled the > partition and metastore logic well. > > - unify batch and streaming > > - Using FileOutputFormat is consistent with FileInputFormat. > > - Add exactly-once related logic. Just 200+ lines code. > > - It's natural to support more table features, like partition commit, > auto compact and etc.. > > > > Second way is reusing Datastream StreamingFileSink: > > - unify streaming sink between table and Datastream. > > - It maybe hard to introduce table related features to StreamingFileSink. > > > > I prefer the first way a little. What do you think? > > I would be surprised if adding “exactly-once related logic” is just 200 > lines of code. There are things like multi part file upload to s3 and there > are also some pending features like [1]. I would suggest to ask/involve > Klou in this discussion. > > If it’s as easy to support exactly-once streaming with current batch sink, > that begs the question, why do we need to maintain StreamingFileSink? > > The worst possible outcome from my perspective will be, if we have another > example of an operator/logic implemented independently both in DataStream > API and Table API. Because I’m pretty sure they will not be fully > compatible, each with it’s own set of limitations, quirks and features. > Especially that we have on our long term roadmap and wish list to unify > such kind of operators. > > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-11499 < > https://issues.apache.org/jira/browse/FLINK-11499> > > > On 16 Mar 2020, at 06:55, Jingsong Li <[hidden email]> wrote: > > > > Thanks Jinhai for involving. > > > >> we need add 'connector.sink.username' for UserGroupInformation when data > > is written to HDFS > > > > Yes, I am not an expert of HDFS, but it seems we need do this "doAs" in > the > > code for access external HDFS. I will update document. > > > > Best, > > Jingsong Lee > > > > On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li <[hidden email]> > wrote: > > > >> Thanks Piotr and Yun for involving. > >> > >> Hi Piotr and Yun, for implementation, > >> > >> FLINK-14254 [1] introduce batch sink table world, it deals with > partitions > >> thing, metastore thing and etc.. And it just reuse Dataset/Datastream > >> FileInputFormat and FileOutputFormat. Filesystem can not do without > >> FileInputFormat, because it need deal with file things, split things. > Like > >> orc and parquet, they need read whole file and have different split > logic. > >> > >> So back to file system connector: > >> - It needs introducing FilesystemTableFactory, FilesystemTableSource and > >> FilesystemTableSink. > >> - For sources, reusing Dataset/Datastream FileInputFormats, there are no > >> other interface to finish file reading. > >> > >> For file sinks: > >> - Batch sink use FLINK-14254 > >> - Streaming sink has two ways. > >> > >> First way is reusing Batch sink in FLINK-14254, It has handled the > >> partition and metastore logic well. > >> - unify batch and streaming > >> - Using FileOutputFormat is consistent with FileInputFormat. > >> - Add exactly-once related logic. Just 200+ lines code. > >> - It's natural to support more table features, like partition commit, > auto > >> compact and etc.. > >> > >> Second way is reusing Datastream StreamingFileSink: > >> - unify streaming sink between table and Datastream. > >> - It maybe hard to introduce table related features to > StreamingFileSink. > >> > >> I prefer the first way a little. What do you think? > >> > >> Hi Yun, > >> > >>> Watermark mechanism might not be enough. > >> > >> Watermarks of subtasks are the same in the "snapshotState". > >> > >>> we might need to also do some coordination between subtasks. > >> > >> Yes, JobMaster is the role to control subtasks. Metastore is a very > >> fragile single point, which can not be accessed by distributed, so it is > >> uniformly accessed by JobMaster. > >> > >> [1]https://issues.apache.org/jira/browse/FLINK-14254 > >> > >> Best, > >> Jingsong Lee > >> > >> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao <[hidden email]> wrote: > >> > >>> Hi, > >>> > >>> Very thanks for Jinsong to bring up this discussion! It should > >>> largely improve the usability after enhancing the FileSystem connector > in > >>> Table. > >>> > >>> I have the same question with Piotr. From my side, I think it > >>> should be better to be able to reuse existing StreamingFileSink. I > think We > >>> have began > >>> enhancing the supported FileFormat (e.g., ORC, Avro...), and > >>> reusing StreamFileSink should be able to avoid repeat work in the Table > >>> library. Besides, > >>> the bucket concept seems also matches the semantics of partition. > >>> > >>> For the notification of adding partitions, I'm a little wondering > >>> that the Watermark mechanism might not be enough since Bucket/Partition > >>> might spans > >>> multiple subtasks. It depends on the level of notification: if we > >>> want to notify for the bucket on each subtask, using watermark to > notifying > >>> each subtask > >>> should be ok, but if we want to notifying for the whole > >>> Bucket/Partition, we might need to also do some coordination between > >>> subtasks. > >>> > >>> > >>> Best, > >>> Yun > >>> > >>> > >>> > >>> ------------------------------------------------------------------ > >>> From:Piotr Nowojski <[hidden email]> > >>> Send Time:2020 Mar. 13 (Fri.) 18:03 > >>> To:dev <[hidden email]> > >>> Cc:user <[hidden email]>; user-zh <[hidden email]> > >>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table > >>> > >>> Hi, > >>> > >>> > >>> Which actual sinks/sources are you planning to use in this feature? Is > it about exposing StreamingFileSink in the Table API? Or do you want to > implement new Sinks/Sources? > >>> > >>> Piotrek > >>> > >>>> On 13 Mar 2020, at 10:04, jinhai wang <[hidden email]> wrote: > >>>> > >>> > >>>> Thanks for FLIP-115. It is really useful feature for platform > developers who manage hundreds of Flink to Hive jobs in production. > >>> > >>>> I think we need add 'connector.sink.username' for > UserGroupInformation when data is written to HDFS > >>>> > >>>> > >>>> 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> 写入: > >>>> > >>>> Hi everyone, > >>>> > >>> > >>>> I'd like to start a discussion about FLIP-115 Filesystem connector > in Table > >>>> [1]. > >>>> This FLIP will bring: > >>>> - Introduce Filesystem table factory in table, support > >>>> csv/parquet/orc/json/avro formats. > >>>> - Introduce streaming filesystem/hive sink in table > >>>> > >>> > >>>> CC to user mail list, if you have any unmet needs, please feel free > to > >>>> reply~ > >>>> > >>>> Look forward to hearing from you. > >>>> > >>>> [1] > >>>> > >>> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > >>>> > >>>> Best, > >>>> Jingsong Lee > >>>> > >>>> > >>>> > >>> > >>> > >>> > >> > >> -- > >> Best, Jingsong Lee > >> > > > > > > -- > > Best, Jingsong Lee > > -- Best, Jingsong Lee |
I would really like to see us converging the stack and the functionality
here. Meaning to try and use the same sinks in the Table API as for the DataStream API, and using the same sink for batch and streaming. The StreamingFileSink has a lot of things that can help with that. If possible, it would be nice to extend it (which would help move towards the above goal) rather than build a second sink. Building a second sink leads us further away from unification. I am a bit puzzled by the statement that sinks are primarily for Hive. The Table API should not be coupled to Hive, it should be an independent batch/streaming API for many use cases, supporting very well for batch and streaming interplay. Supporting Hive is great, but we should not be building this towards Hive, as just yet another Hive runtime. Why "yet another Hive runtime" when what we have a unique streaming engine that can do much more? We would drop our own strength and reduce ourselves to a limited subset. Let's build a File Sink that can also support Hive, but can do so much more. For example, efficient streaming file ingestion as materialized views from changelogs. *## Writing Files in Streaming* To write files in streaming, I don't see another way than using the streaming file sink. If you want to write files across checkpoints, support exactly-once, and support consistent "stop with savepoint", it is not trivial. A part of the complexity comes from the fact that not all targets are actually file systems, and not all have simple semantics for persistence. S3 for example does not support renames (only copies, which may take a lot of time) and it does not support flush/sync of data (the S3 file system in Hadoop exposes that but it does not work. flush/sync, followed by a failure, leads to data loss). You need to devise a separate protocol for that, which is exactly what has already been done and abstracted behind the recoverable writers. If you re-engineer that in the, you will end up either missing many things (intermediate persistence on different file systems, and atomic commit in the absence of renames, etc.), or you end up doing something similar as the recoverable writers do. *## Atomic Commit in Batch* For batch sinks, it is also desirable to write the data first and then atomically commit it once the job is done. Hadoop has spent a lot of time making this work, see this doc here, specifically the section on 'The "Magic" Committer'. [1] What Flink has built in the RecoverableWriter is in some way an even better version of this, because it works without extra files (we pass data through checkpoint state) and it supports not only committing once at the end, but committing multiple time intermediate parts during checkpoints. Meaning using the recoverable writer mechanism in batch would allow us to immediately get the efficient atomic commit implementations on file:// hdfs:// and s3://, with a well defined way to implement it also for other file systems. *## Batch / Streaming Unification* It would be great to start looking at these things in the same way: - streaming (exactly-once): commits files (after finished) at the next checkpoint - batch: single commit at the end of the job *## DataStream / Table API Stack Unification* Having the same set of capabilities would make it much easier for users to understand the system. Especially when it comes to consistent behavior across external systems. Having a different file sink in Table API and DataStream API means that DataStream can write correctly to S3 while Table API cannot. *## What is missing?* It seems there are some things that get in the way of naturally Can you make a list of what features are missing in the StreamingFileSink that make it usable for the use cases you have in mind? Best, Stephan [1] https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li <[hidden email]> wrote: > Hi Piotr, > > I am very entangled. > > Let me re-list the table streaming sink requirements: > - In table, maybe 90% sinks are for Hive. The parquet and orc are the most > important formats. Hive provide RecordWriters, it is easy to support all > hive formats by using it, and we don't need concern hive version > compatibility too, but it can not work with FSDataOutputStream. > - Hive table maybe use external HDFS. It means, hive has its own hadoop > configuration. > - In table, partition commit is needed, we can not just move files, it is > important to complete table semantics to update catalog. > > You are right DataStream and Table streaming sink will not be fully > compatible, each with its own set of limitations, quirks and features. > But if re-using DataStream, batch and streaming also will not be fully > compatible. Provide a unify experience to batch and streaming is also > important. > > Table and DataStream have different concerns, and they tilt in different > directions. > > Of course, it is very good to see a unify implementation to solve batch > sink and hive things, unify DataStream batch sink and DataStream streaming > sink and Table batch sink and Table streaming sink. > > Le's see what others think. > > Best, > Jingsong Lee > > > On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski <[hidden email]> > wrote: > > > Hi Jingsong, > > > > > First way is reusing Batch sink in FLINK-14254, It has handled the > > partition and metastore logic well. > > > - unify batch and streaming > > > - Using FileOutputFormat is consistent with FileInputFormat. > > > - Add exactly-once related logic. Just 200+ lines code. > > > - It's natural to support more table features, like partition commit, > > auto compact and etc.. > > > > > > Second way is reusing Datastream StreamingFileSink: > > > - unify streaming sink between table and Datastream. > > > - It maybe hard to introduce table related features to > StreamingFileSink. > > > > > > I prefer the first way a little. What do you think? > > > > I would be surprised if adding “exactly-once related logic” is just 200 > > lines of code. There are things like multi part file upload to s3 and > there > > are also some pending features like [1]. I would suggest to ask/involve > > Klou in this discussion. > > > > If it’s as easy to support exactly-once streaming with current batch > sink, > > that begs the question, why do we need to maintain StreamingFileSink? > > > > The worst possible outcome from my perspective will be, if we have > another > > example of an operator/logic implemented independently both in DataStream > > API and Table API. Because I’m pretty sure they will not be fully > > compatible, each with it’s own set of limitations, quirks and features. > > Especially that we have on our long term roadmap and wish list to unify > > such kind of operators. > > > > Piotrek > > > > [1] https://issues.apache.org/jira/browse/FLINK-11499 < > > https://issues.apache.org/jira/browse/FLINK-11499> > > > > > On 16 Mar 2020, at 06:55, Jingsong Li <[hidden email]> wrote: > > > > > > Thanks Jinhai for involving. > > > > > >> we need add 'connector.sink.username' for UserGroupInformation when > data > > > is written to HDFS > > > > > > Yes, I am not an expert of HDFS, but it seems we need do this "doAs" in > > the > > > code for access external HDFS. I will update document. > > > > > > Best, > > > Jingsong Lee > > > > > > On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li <[hidden email]> > > wrote: > > > > > >> Thanks Piotr and Yun for involving. > > >> > > >> Hi Piotr and Yun, for implementation, > > >> > > >> FLINK-14254 [1] introduce batch sink table world, it deals with > > partitions > > >> thing, metastore thing and etc.. And it just reuse Dataset/Datastream > > >> FileInputFormat and FileOutputFormat. Filesystem can not do without > > >> FileInputFormat, because it need deal with file things, split things. > > Like > > >> orc and parquet, they need read whole file and have different split > > logic. > > >> > > >> So back to file system connector: > > >> - It needs introducing FilesystemTableFactory, FilesystemTableSource > and > > >> FilesystemTableSink. > > >> - For sources, reusing Dataset/Datastream FileInputFormats, there are > no > > >> other interface to finish file reading. > > >> > > >> For file sinks: > > >> - Batch sink use FLINK-14254 > > >> - Streaming sink has two ways. > > >> > > >> First way is reusing Batch sink in FLINK-14254, It has handled the > > >> partition and metastore logic well. > > >> - unify batch and streaming > > >> - Using FileOutputFormat is consistent with FileInputFormat. > > >> - Add exactly-once related logic. Just 200+ lines code. > > >> - It's natural to support more table features, like partition commit, > > auto > > >> compact and etc.. > > >> > > >> Second way is reusing Datastream StreamingFileSink: > > >> - unify streaming sink between table and Datastream. > > >> - It maybe hard to introduce table related features to > > StreamingFileSink. > > >> > > >> I prefer the first way a little. What do you think? > > >> > > >> Hi Yun, > > >> > > >>> Watermark mechanism might not be enough. > > >> > > >> Watermarks of subtasks are the same in the "snapshotState". > > >> > > >>> we might need to also do some coordination between subtasks. > > >> > > >> Yes, JobMaster is the role to control subtasks. Metastore is a very > > >> fragile single point, which can not be accessed by distributed, so it > is > > >> uniformly accessed by JobMaster. > > >> > > >> [1]https://issues.apache.org/jira/browse/FLINK-14254 > > >> > > >> Best, > > >> Jingsong Lee > > >> > > >> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao <[hidden email]> wrote: > > >> > > >>> Hi, > > >>> > > >>> Very thanks for Jinsong to bring up this discussion! It should > > >>> largely improve the usability after enhancing the FileSystem > connector > > in > > >>> Table. > > >>> > > >>> I have the same question with Piotr. From my side, I think it > > >>> should be better to be able to reuse existing StreamingFileSink. I > > think We > > >>> have began > > >>> enhancing the supported FileFormat (e.g., ORC, Avro...), and > > >>> reusing StreamFileSink should be able to avoid repeat work in the > Table > > >>> library. Besides, > > >>> the bucket concept seems also matches the semantics of > partition. > > >>> > > >>> For the notification of adding partitions, I'm a little > wondering > > >>> that the Watermark mechanism might not be enough since > Bucket/Partition > > >>> might spans > > >>> multiple subtasks. It depends on the level of notification: if > we > > >>> want to notify for the bucket on each subtask, using watermark to > > notifying > > >>> each subtask > > >>> should be ok, but if we want to notifying for the whole > > >>> Bucket/Partition, we might need to also do some coordination between > > >>> subtasks. > > >>> > > >>> > > >>> Best, > > >>> Yun > > >>> > > >>> > > >>> > > >>> ------------------------------------------------------------------ > > >>> From:Piotr Nowojski <[hidden email]> > > >>> Send Time:2020 Mar. 13 (Fri.) 18:03 > > >>> To:dev <[hidden email]> > > >>> Cc:user <[hidden email]>; user-zh <[hidden email]> > > >>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table > > >>> > > >>> Hi, > > >>> > > >>> > > >>> Which actual sinks/sources are you planning to use in this feature? > Is > > it about exposing StreamingFileSink in the Table API? Or do you want to > > implement new Sinks/Sources? > > >>> > > >>> Piotrek > > >>> > > >>>> On 13 Mar 2020, at 10:04, jinhai wang <[hidden email]> wrote: > > >>>> > > >>> > > >>>> Thanks for FLIP-115. It is really useful feature for platform > > developers who manage hundreds of Flink to Hive jobs in production. > > >>> > > >>>> I think we need add 'connector.sink.username' for > > UserGroupInformation when data is written to HDFS > > >>>> > > >>>> > > >>>> 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> 写入: > > >>>> > > >>>> Hi everyone, > > >>>> > > >>> > > >>>> I'd like to start a discussion about FLIP-115 Filesystem connector > > in Table > > >>>> [1]. > > >>>> This FLIP will bring: > > >>>> - Introduce Filesystem table factory in table, support > > >>>> csv/parquet/orc/json/avro formats. > > >>>> - Introduce streaming filesystem/hive sink in table > > >>>> > > >>> > > >>>> CC to user mail list, if you have any unmet needs, please feel > free > > to > > >>>> reply~ > > >>>> > > >>>> Look forward to hearing from you. > > >>>> > > >>>> [1] > > >>>> > > >>> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > >>>> > > >>>> Best, > > >>>> Jingsong Lee > > >>>> > > >>>> > > >>>> > > >>> > > >>> > > >>> > > >> > > >> -- > > >> Best, Jingsong Lee > > >> > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > -- > Best, Jingsong Lee > |
Hi Jingsong ,
I am looking forward this feature. Because in some streaming application,it need transfer their messages to hdfs , in order to offline analysis. Best wishes, LakeShen Stephan Ewen <[hidden email]> 于2020年3月17日周二 下午7:42写道: > I would really like to see us converging the stack and the functionality > here. > Meaning to try and use the same sinks in the Table API as for the > DataStream API, and using the same sink for batch and streaming. > > The StreamingFileSink has a lot of things that can help with that. If > possible, it would be nice to extend it (which would help move towards the > above goal) rather than build a second sink. Building a second sink leads > us further away from unification. > > I am a bit puzzled by the statement that sinks are primarily for Hive. The > Table API should not be coupled to Hive, it should be an independent > batch/streaming API for many use cases, supporting very well for batch and > streaming interplay. Supporting Hive is great, but we should not be > building this towards Hive, as just yet another Hive runtime. Why "yet > another Hive runtime" when what we have a unique streaming engine that can > do much more? We would drop our own strength and reduce ourselves to a > limited subset. > > Let's build a File Sink that can also support Hive, but can do so much > more. For example, efficient streaming file ingestion as materialized views > from changelogs. > > > *## Writing Files in Streaming* > > To write files in streaming, I don't see another way than using the > streaming file sink. If you want to write files across checkpoints, support > exactly-once, and support consistent "stop with savepoint", it is not > trivial. > > A part of the complexity comes from the fact that not all targets are > actually file systems, and not all have simple semantics for persistence. > S3 for example does not support renames (only copies, which may take a lot > of time) and it does not support flush/sync of data (the S3 file system in > Hadoop exposes that but it does not work. flush/sync, followed by a > failure, leads to data loss). You need to devise a separate protocol for > that, which is exactly what has already been done and abstracted behind the > recoverable writers. > > If you re-engineer that in the, you will end up either missing many things > (intermediate persistence on different file systems, and atomic commit in > the absence of renames, etc.), or you end up doing something similar as the > recoverable writers do. > > > *## Atomic Commit in Batch* > > For batch sinks, it is also desirable to write the data first and then > atomically commit it once the job is done. > Hadoop has spent a lot of time making this work, see this doc here, > specifically the section on 'The "Magic" Committer'. [1] > > What Flink has built in the RecoverableWriter is in some way an even better > version of this, because it works without extra files (we pass data through > checkpoint state) and it supports not only committing once at the end, but > committing multiple time intermediate parts during checkpoints. > > Meaning using the recoverable writer mechanism in batch would allow us to > immediately get the efficient atomic commit implementations on file:// > hdfs:// and s3://, with a well defined way to implement it also for other > file systems. > > > *## Batch / Streaming Unification* > > It would be great to start looking at these things in the same way: > - streaming (exactly-once): commits files (after finished) at the next > checkpoint > - batch: single commit at the end of the job > > > *## DataStream / Table API Stack Unification* > > Having the same set of capabilities would make it much easier for users to > understand the system. > Especially when it comes to consistent behavior across external systems. > Having a different file sink in Table API and DataStream API means that > DataStream can write correctly to S3 while Table API cannot. > > > *## What is missing?* > > It seems there are some things that get in the way of naturally > Can you make a list of what features are missing in the StreamingFileSink > that make it usable for the use cases you have in mind? > > Best, > Stephan > > [1] > > https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html > > > On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li <[hidden email]> > wrote: > > > Hi Piotr, > > > > I am very entangled. > > > > Let me re-list the table streaming sink requirements: > > - In table, maybe 90% sinks are for Hive. The parquet and orc are the > most > > important formats. Hive provide RecordWriters, it is easy to support all > > hive formats by using it, and we don't need concern hive version > > compatibility too, but it can not work with FSDataOutputStream. > > - Hive table maybe use external HDFS. It means, hive has its own hadoop > > configuration. > > - In table, partition commit is needed, we can not just move files, it is > > important to complete table semantics to update catalog. > > > > You are right DataStream and Table streaming sink will not be fully > > compatible, each with its own set of limitations, quirks and features. > > But if re-using DataStream, batch and streaming also will not be fully > > compatible. Provide a unify experience to batch and streaming is also > > important. > > > > Table and DataStream have different concerns, and they tilt in different > > directions. > > > > Of course, it is very good to see a unify implementation to solve batch > > sink and hive things, unify DataStream batch sink and DataStream > streaming > > sink and Table batch sink and Table streaming sink. > > > > Le's see what others think. > > > > Best, > > Jingsong Lee > > > > > > On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski <[hidden email]> > > wrote: > > > > > Hi Jingsong, > > > > > > > First way is reusing Batch sink in FLINK-14254, It has handled the > > > partition and metastore logic well. > > > > - unify batch and streaming > > > > - Using FileOutputFormat is consistent with FileInputFormat. > > > > - Add exactly-once related logic. Just 200+ lines code. > > > > - It's natural to support more table features, like partition commit, > > > auto compact and etc.. > > > > > > > > Second way is reusing Datastream StreamingFileSink: > > > > - unify streaming sink between table and Datastream. > > > > - It maybe hard to introduce table related features to > > StreamingFileSink. > > > > > > > > I prefer the first way a little. What do you think? > > > > > > I would be surprised if adding “exactly-once related logic” is just 200 > > > lines of code. There are things like multi part file upload to s3 and > > there > > > are also some pending features like [1]. I would suggest to ask/involve > > > Klou in this discussion. > > > > > > If it’s as easy to support exactly-once streaming with current batch > > sink, > > > that begs the question, why do we need to maintain StreamingFileSink? > > > > > > The worst possible outcome from my perspective will be, if we have > > another > > > example of an operator/logic implemented independently both in > DataStream > > > API and Table API. Because I’m pretty sure they will not be fully > > > compatible, each with it’s own set of limitations, quirks and features. > > > Especially that we have on our long term roadmap and wish list to unify > > > such kind of operators. > > > > > > Piotrek > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11499 < > > > https://issues.apache.org/jira/browse/FLINK-11499> > > > > > > > On 16 Mar 2020, at 06:55, Jingsong Li <[hidden email]> > wrote: > > > > > > > > Thanks Jinhai for involving. > > > > > > > >> we need add 'connector.sink.username' for UserGroupInformation when > > data > > > > is written to HDFS > > > > > > > > Yes, I am not an expert of HDFS, but it seems we need do this "doAs" > in > > > the > > > > code for access external HDFS. I will update document. > > > > > > > > Best, > > > > Jingsong Lee > > > > > > > > On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li <[hidden email] > > > > > wrote: > > > > > > > >> Thanks Piotr and Yun for involving. > > > >> > > > >> Hi Piotr and Yun, for implementation, > > > >> > > > >> FLINK-14254 [1] introduce batch sink table world, it deals with > > > partitions > > > >> thing, metastore thing and etc.. And it just reuse > Dataset/Datastream > > > >> FileInputFormat and FileOutputFormat. Filesystem can not do without > > > >> FileInputFormat, because it need deal with file things, split > things. > > > Like > > > >> orc and parquet, they need read whole file and have different split > > > logic. > > > >> > > > >> So back to file system connector: > > > >> - It needs introducing FilesystemTableFactory, FilesystemTableSource > > and > > > >> FilesystemTableSink. > > > >> - For sources, reusing Dataset/Datastream FileInputFormats, there > are > > no > > > >> other interface to finish file reading. > > > >> > > > >> For file sinks: > > > >> - Batch sink use FLINK-14254 > > > >> - Streaming sink has two ways. > > > >> > > > >> First way is reusing Batch sink in FLINK-14254, It has handled the > > > >> partition and metastore logic well. > > > >> - unify batch and streaming > > > >> - Using FileOutputFormat is consistent with FileInputFormat. > > > >> - Add exactly-once related logic. Just 200+ lines code. > > > >> - It's natural to support more table features, like partition > commit, > > > auto > > > >> compact and etc.. > > > >> > > > >> Second way is reusing Datastream StreamingFileSink: > > > >> - unify streaming sink between table and Datastream. > > > >> - It maybe hard to introduce table related features to > > > StreamingFileSink. > > > >> > > > >> I prefer the first way a little. What do you think? > > > >> > > > >> Hi Yun, > > > >> > > > >>> Watermark mechanism might not be enough. > > > >> > > > >> Watermarks of subtasks are the same in the "snapshotState". > > > >> > > > >>> we might need to also do some coordination between subtasks. > > > >> > > > >> Yes, JobMaster is the role to control subtasks. Metastore is a very > > > >> fragile single point, which can not be accessed by distributed, so > it > > is > > > >> uniformly accessed by JobMaster. > > > >> > > > >> [1]https://issues.apache.org/jira/browse/FLINK-14254 > > > >> > > > >> Best, > > > >> Jingsong Lee > > > >> > > > >> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao <[hidden email]> > wrote: > > > >> > > > >>> Hi, > > > >>> > > > >>> Very thanks for Jinsong to bring up this discussion! It > should > > > >>> largely improve the usability after enhancing the FileSystem > > connector > > > in > > > >>> Table. > > > >>> > > > >>> I have the same question with Piotr. From my side, I think it > > > >>> should be better to be able to reuse existing StreamingFileSink. I > > > think We > > > >>> have began > > > >>> enhancing the supported FileFormat (e.g., ORC, Avro...), and > > > >>> reusing StreamFileSink should be able to avoid repeat work in the > > Table > > > >>> library. Besides, > > > >>> the bucket concept seems also matches the semantics of > > partition. > > > >>> > > > >>> For the notification of adding partitions, I'm a little > > wondering > > > >>> that the Watermark mechanism might not be enough since > > Bucket/Partition > > > >>> might spans > > > >>> multiple subtasks. It depends on the level of notification: > if > > we > > > >>> want to notify for the bucket on each subtask, using watermark to > > > notifying > > > >>> each subtask > > > >>> should be ok, but if we want to notifying for the whole > > > >>> Bucket/Partition, we might need to also do some coordination > between > > > >>> subtasks. > > > >>> > > > >>> > > > >>> Best, > > > >>> Yun > > > >>> > > > >>> > > > >>> > > > >>> ------------------------------------------------------------------ > > > >>> From:Piotr Nowojski <[hidden email]> > > > >>> Send Time:2020 Mar. 13 (Fri.) 18:03 > > > >>> To:dev <[hidden email]> > > > >>> Cc:user <[hidden email]>; user-zh <[hidden email] > > > > > >>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table > > > >>> > > > >>> Hi, > > > >>> > > > >>> > > > >>> Which actual sinks/sources are you planning to use in this feature? > > Is > > > it about exposing StreamingFileSink in the Table API? Or do you want to > > > implement new Sinks/Sources? > > > >>> > > > >>> Piotrek > > > >>> > > > >>>> On 13 Mar 2020, at 10:04, jinhai wang <[hidden email]> > wrote: > > > >>>> > > > >>> > > > >>>> Thanks for FLIP-115. It is really useful feature for platform > > > developers who manage hundreds of Flink to Hive jobs in production. > > > >>> > > > >>>> I think we need add 'connector.sink.username' for > > > UserGroupInformation when data is written to HDFS > > > >>>> > > > >>>> > > > >>>> 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> 写入: > > > >>>> > > > >>>> Hi everyone, > > > >>>> > > > >>> > > > >>>> I'd like to start a discussion about FLIP-115 Filesystem > connector > > > in Table > > > >>>> [1]. > > > >>>> This FLIP will bring: > > > >>>> - Introduce Filesystem table factory in table, support > > > >>>> csv/parquet/orc/json/avro formats. > > > >>>> - Introduce streaming filesystem/hive sink in table > > > >>>> > > > >>> > > > >>>> CC to user mail list, if you have any unmet needs, please feel > > free > > > to > > > >>>> reply~ > > > >>>> > > > >>>> Look forward to hearing from you. > > > >>>> > > > >>>> [1] > > > >>>> > > > >>> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > > >>>> > > > >>>> Best, > > > >>>> Jingsong Lee > > > >>>> > > > >>>> > > > >>>> > > > >>> > > > >>> > > > >>> > > > >> > > > >> -- > > > >> Best, Jingsong Lee > > > >> > > > > > > > > > > > > -- > > > > Best, Jingsong Lee > > > > > > > > > > -- > > Best, Jingsong Lee > > > |
Hi Stephan, Thanks very much for your detailed reply.
*## StreamingFileSink not support writer with path* The FLIP is "Filesystem connector in Table", it's about building up Flink Table's capabilities. But I think Hive is important, I see that most users use Flink and Spark to write data from Kafka to hive. Streaming writing, I see that these two engines are convenient and popular. I mean, Flink is not only a hive runtime, but also an important part of offline data warehouse. The thing is StreamingFileSink not support hadoop record writers. Yes, we can support them one by one. I see the community integrating ORC [1]. But it's really not an easy thing. And we have to be careful to maintain compatibility. After all, users downstream use other computing engines to analyze. Yes, exposing "RecoverableFsDataOutputStream" to writers is good to subsequent optimization [2]. But there are many cases. It is enough for users to generate new files at the checkpoint. They pay more attention to whether they can do it and whether there is a risk of compatibility. Therefore, RecordWriter is used here. *## External HDFS access* Including hadoop configuration and Kerberos related things. *## Partition commit* Committing a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read.The common way is to add a success file or update metastore. Of course, there are other ways to notify. We need to provide flexible mechanisms. As you mentioned, yes, we can extend "StreamingFileSink" for this part. *## Batch / Streaming Unification* Yes, it is about exactly-once and single commit at the end, There are also some "bounded" differences. For example, batch can support sorting. In this way, you can sort by partition, which can reduce the number of writers written at the same time. Dynamic partition writing in batch may produce many unordered partitions. [1] https://issues.apache.org/jira/browse/FLINK-10114 [2] https://issues.apache.org/jira/browse/FLINK-11499 Best, Jingsong Lee On Tue, Mar 17, 2020 at 8:00 PM LakeShen <[hidden email]> wrote: > Hi Jingsong , > > I am looking forward this feature. Because in some streaming application,it > need transfer their messages to hdfs , in order to offline analysis. > > Best wishes, > LakeShen > > Stephan Ewen <[hidden email]> 于2020年3月17日周二 下午7:42写道: > > > I would really like to see us converging the stack and the functionality > > here. > > Meaning to try and use the same sinks in the Table API as for the > > DataStream API, and using the same sink for batch and streaming. > > > > The StreamingFileSink has a lot of things that can help with that. If > > possible, it would be nice to extend it (which would help move towards > the > > above goal) rather than build a second sink. Building a second sink leads > > us further away from unification. > > > > I am a bit puzzled by the statement that sinks are primarily for Hive. > The > > Table API should not be coupled to Hive, it should be an independent > > batch/streaming API for many use cases, supporting very well for batch > and > > streaming interplay. Supporting Hive is great, but we should not be > > building this towards Hive, as just yet another Hive runtime. Why "yet > > another Hive runtime" when what we have a unique streaming engine that > can > > do much more? We would drop our own strength and reduce ourselves to a > > limited subset. > > > > Let's build a File Sink that can also support Hive, but can do so much > > more. For example, efficient streaming file ingestion as materialized > views > > from changelogs. > > > > > > *## Writing Files in Streaming* > > > > To write files in streaming, I don't see another way than using the > > streaming file sink. If you want to write files across checkpoints, > support > > exactly-once, and support consistent "stop with savepoint", it is not > > trivial. > > > > A part of the complexity comes from the fact that not all targets are > > actually file systems, and not all have simple semantics for persistence. > > S3 for example does not support renames (only copies, which may take a > lot > > of time) and it does not support flush/sync of data (the S3 file system > in > > Hadoop exposes that but it does not work. flush/sync, followed by a > > failure, leads to data loss). You need to devise a separate protocol for > > that, which is exactly what has already been done and abstracted behind > the > > recoverable writers. > > > > If you re-engineer that in the, you will end up either missing many > things > > (intermediate persistence on different file systems, and atomic commit in > > the absence of renames, etc.), or you end up doing something similar as > the > > recoverable writers do. > > > > > > *## Atomic Commit in Batch* > > > > For batch sinks, it is also desirable to write the data first and then > > atomically commit it once the job is done. > > Hadoop has spent a lot of time making this work, see this doc here, > > specifically the section on 'The "Magic" Committer'. [1] > > > > What Flink has built in the RecoverableWriter is in some way an even > better > > version of this, because it works without extra files (we pass data > through > > checkpoint state) and it supports not only committing once at the end, > but > > committing multiple time intermediate parts during checkpoints. > > > > Meaning using the recoverable writer mechanism in batch would allow us to > > immediately get the efficient atomic commit implementations on file:// > > hdfs:// and s3://, with a well defined way to implement it also for other > > file systems. > > > > > > *## Batch / Streaming Unification* > > > > It would be great to start looking at these things in the same way: > > - streaming (exactly-once): commits files (after finished) at the next > > checkpoint > > - batch: single commit at the end of the job > > > > > > *## DataStream / Table API Stack Unification* > > > > Having the same set of capabilities would make it much easier for users > to > > understand the system. > > Especially when it comes to consistent behavior across external systems. > > Having a different file sink in Table API and DataStream API means that > > DataStream can write correctly to S3 while Table API cannot. > > > > > > *## What is missing?* > > > > It seems there are some things that get in the way of naturally > > Can you make a list of what features are missing in the StreamingFileSink > > that make it usable for the use cases you have in mind? > > > > Best, > > Stephan > > > > [1] > > > > > https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html > > > > > > On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li <[hidden email]> > > wrote: > > > > > Hi Piotr, > > > > > > I am very entangled. > > > > > > Let me re-list the table streaming sink requirements: > > > - In table, maybe 90% sinks are for Hive. The parquet and orc are the > > most > > > important formats. Hive provide RecordWriters, it is easy to support > all > > > hive formats by using it, and we don't need concern hive version > > > compatibility too, but it can not work with FSDataOutputStream. > > > - Hive table maybe use external HDFS. It means, hive has its own hadoop > > > configuration. > > > - In table, partition commit is needed, we can not just move files, it > is > > > important to complete table semantics to update catalog. > > > > > > You are right DataStream and Table streaming sink will not be fully > > > compatible, each with its own set of limitations, quirks and features. > > > But if re-using DataStream, batch and streaming also will not be fully > > > compatible. Provide a unify experience to batch and streaming is also > > > important. > > > > > > Table and DataStream have different concerns, and they tilt in > different > > > directions. > > > > > > Of course, it is very good to see a unify implementation to solve batch > > > sink and hive things, unify DataStream batch sink and DataStream > > streaming > > > sink and Table batch sink and Table streaming sink. > > > > > > Le's see what others think. > > > > > > Best, > > > Jingsong Lee > > > > > > > > > On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski <[hidden email]> > > > wrote: > > > > > > > Hi Jingsong, > > > > > > > > > First way is reusing Batch sink in FLINK-14254, It has handled the > > > > partition and metastore logic well. > > > > > - unify batch and streaming > > > > > - Using FileOutputFormat is consistent with FileInputFormat. > > > > > - Add exactly-once related logic. Just 200+ lines code. > > > > > - It's natural to support more table features, like partition > commit, > > > > auto compact and etc.. > > > > > > > > > > Second way is reusing Datastream StreamingFileSink: > > > > > - unify streaming sink between table and Datastream. > > > > > - It maybe hard to introduce table related features to > > > StreamingFileSink. > > > > > > > > > > I prefer the first way a little. What do you think? > > > > > > > > I would be surprised if adding “exactly-once related logic” is just > 200 > > > > lines of code. There are things like multi part file upload to s3 and > > > there > > > > are also some pending features like [1]. I would suggest to > ask/involve > > > > Klou in this discussion. > > > > > > > > If it’s as easy to support exactly-once streaming with current batch > > > sink, > > > > that begs the question, why do we need to maintain StreamingFileSink? > > > > > > > > The worst possible outcome from my perspective will be, if we have > > > another > > > > example of an operator/logic implemented independently both in > > DataStream > > > > API and Table API. Because I’m pretty sure they will not be fully > > > > compatible, each with it’s own set of limitations, quirks and > features. > > > > Especially that we have on our long term roadmap and wish list to > unify > > > > such kind of operators. > > > > > > > > Piotrek > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11499 < > > > > https://issues.apache.org/jira/browse/FLINK-11499> > > > > > > > > > On 16 Mar 2020, at 06:55, Jingsong Li <[hidden email]> > > wrote: > > > > > > > > > > Thanks Jinhai for involving. > > > > > > > > > >> we need add 'connector.sink.username' for UserGroupInformation > when > > > data > > > > > is written to HDFS > > > > > > > > > > Yes, I am not an expert of HDFS, but it seems we need do this > "doAs" > > in > > > > the > > > > > code for access external HDFS. I will update document. > > > > > > > > > > Best, > > > > > Jingsong Lee > > > > > > > > > > On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li < > [hidden email] > > > > > > > wrote: > > > > > > > > > >> Thanks Piotr and Yun for involving. > > > > >> > > > > >> Hi Piotr and Yun, for implementation, > > > > >> > > > > >> FLINK-14254 [1] introduce batch sink table world, it deals with > > > > partitions > > > > >> thing, metastore thing and etc.. And it just reuse > > Dataset/Datastream > > > > >> FileInputFormat and FileOutputFormat. Filesystem can not do > without > > > > >> FileInputFormat, because it need deal with file things, split > > things. > > > > Like > > > > >> orc and parquet, they need read whole file and have different > split > > > > logic. > > > > >> > > > > >> So back to file system connector: > > > > >> - It needs introducing FilesystemTableFactory, > FilesystemTableSource > > > and > > > > >> FilesystemTableSink. > > > > >> - For sources, reusing Dataset/Datastream FileInputFormats, there > > are > > > no > > > > >> other interface to finish file reading. > > > > >> > > > > >> For file sinks: > > > > >> - Batch sink use FLINK-14254 > > > > >> - Streaming sink has two ways. > > > > >> > > > > >> First way is reusing Batch sink in FLINK-14254, It has handled the > > > > >> partition and metastore logic well. > > > > >> - unify batch and streaming > > > > >> - Using FileOutputFormat is consistent with FileInputFormat. > > > > >> - Add exactly-once related logic. Just 200+ lines code. > > > > >> - It's natural to support more table features, like partition > > commit, > > > > auto > > > > >> compact and etc.. > > > > >> > > > > >> Second way is reusing Datastream StreamingFileSink: > > > > >> - unify streaming sink between table and Datastream. > > > > >> - It maybe hard to introduce table related features to > > > > StreamingFileSink. > > > > >> > > > > >> I prefer the first way a little. What do you think? > > > > >> > > > > >> Hi Yun, > > > > >> > > > > >>> Watermark mechanism might not be enough. > > > > >> > > > > >> Watermarks of subtasks are the same in the "snapshotState". > > > > >> > > > > >>> we might need to also do some coordination between subtasks. > > > > >> > > > > >> Yes, JobMaster is the role to control subtasks. Metastore is a > very > > > > >> fragile single point, which can not be accessed by distributed, so > > it > > > is > > > > >> uniformly accessed by JobMaster. > > > > >> > > > > >> [1]https://issues.apache.org/jira/browse/FLINK-14254 > > > > >> > > > > >> Best, > > > > >> Jingsong Lee > > > > >> > > > > >> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao <[hidden email]> > > wrote: > > > > >> > > > > >>> Hi, > > > > >>> > > > > >>> Very thanks for Jinsong to bring up this discussion! It > > should > > > > >>> largely improve the usability after enhancing the FileSystem > > > connector > > > > in > > > > >>> Table. > > > > >>> > > > > >>> I have the same question with Piotr. From my side, I think > it > > > > >>> should be better to be able to reuse existing StreamingFileSink. > I > > > > think We > > > > >>> have began > > > > >>> enhancing the supported FileFormat (e.g., ORC, Avro...), > and > > > > >>> reusing StreamFileSink should be able to avoid repeat work in the > > > Table > > > > >>> library. Besides, > > > > >>> the bucket concept seems also matches the semantics of > > > partition. > > > > >>> > > > > >>> For the notification of adding partitions, I'm a little > > > wondering > > > > >>> that the Watermark mechanism might not be enough since > > > Bucket/Partition > > > > >>> might spans > > > > >>> multiple subtasks. It depends on the level of notification: > > if > > > we > > > > >>> want to notify for the bucket on each subtask, using watermark to > > > > notifying > > > > >>> each subtask > > > > >>> should be ok, but if we want to notifying for the whole > > > > >>> Bucket/Partition, we might need to also do some coordination > > between > > > > >>> subtasks. > > > > >>> > > > > >>> > > > > >>> Best, > > > > >>> Yun > > > > >>> > > > > >>> > > > > >>> > > > > >>> > ------------------------------------------------------------------ > > > > >>> From:Piotr Nowojski <[hidden email]> > > > > >>> Send Time:2020 Mar. 13 (Fri.) 18:03 > > > > >>> To:dev <[hidden email]> > > > > >>> Cc:user <[hidden email]>; user-zh < > [hidden email] > > > > > > > >>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table > > > > >>> > > > > >>> Hi, > > > > >>> > > > > >>> > > > > >>> Which actual sinks/sources are you planning to use in this > feature? > > > Is > > > > it about exposing StreamingFileSink in the Table API? Or do you want > to > > > > implement new Sinks/Sources? > > > > >>> > > > > >>> Piotrek > > > > >>> > > > > >>>> On 13 Mar 2020, at 10:04, jinhai wang <[hidden email]> > > wrote: > > > > >>>> > > > > >>> > > > > >>>> Thanks for FLIP-115. It is really useful feature for platform > > > > developers who manage hundreds of Flink to Hive jobs in production. > > > > >>> > > > > >>>> I think we need add 'connector.sink.username' for > > > > UserGroupInformation when data is written to HDFS > > > > >>>> > > > > >>>> > > > > >>>> 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> 写入: > > > > >>>> > > > > >>>> Hi everyone, > > > > >>>> > > > > >>> > > > > >>>> I'd like to start a discussion about FLIP-115 Filesystem > > connector > > > > in Table > > > > >>>> [1]. > > > > >>>> This FLIP will bring: > > > > >>>> - Introduce Filesystem table factory in table, support > > > > >>>> csv/parquet/orc/json/avro formats. > > > > >>>> - Introduce streaming filesystem/hive sink in table > > > > >>>> > > > > >>> > > > > >>>> CC to user mail list, if you have any unmet needs, please feel > > > free > > > > to > > > > >>>> reply~ > > > > >>>> > > > > >>>> Look forward to hearing from you. > > > > >>>> > > > > >>>> [1] > > > > >>>> > > > > >>> > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > > > >>>> > > > > >>>> Best, > > > > >>>> Jingsong Lee > > > > >>>> > > > > >>>> > > > > >>>> > > > > >>> > > > > >>> > > > > >>> > > > > >> > > > > >> -- > > > > >> Best, Jingsong Lee > > > > >> > > > > > > > > > > > > > > > -- > > > > > Best, Jingsong Lee > > > > > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > -- Best, Jingsong Lee |
>> The FLIP is "Filesystem connector in Table", it's about building up
Flink Table's capabilities. That is exactly what worries me. The whole effort is not thinking about Flink as a whole any more. This proposal is not trying to build a consistent user experience across batch and streaming, across Table API, SQL, and DataStream. The proposal is building a separate, disconnected ecosystem for the Table API, specific to batch processing and some limited streaming setups. Specific to one type of environment (Hive and HDFS). It willingly omits support for other environments and conflicts with efforts in other components to unify. Supporting common use cases is good, but in my opinion not at the price of creating a "fractured" project where the approaches in different layers don't fit together any more. > *## StreamingFileSink not support writer with path* > > The FLIP is "Filesystem connector in Table", it's about building up Flink > Table's capabilities. But I think Hive is important, I see that most users > use Flink and Spark to write data from Kafka to hive. Streaming writing, I > see that these two engines are convenient and popular. I mean, Flink is not > only a hive runtime, but also an important part of offline data warehouse. > The thing is StreamingFileSink not support hadoop record writers. Yes, we > can support them one by one. I see the community integrating ORC [1]. But > it's really not an easy thing. And we have to be careful to maintain > compatibility. After all, users downstream use other computing engines to > analyze. > Yes, exposing "RecoverableFsDataOutputStream" to writers is good to > subsequent optimization [2]. But there are many cases. It is enough for > users to generate new files at the checkpoint. They pay more attention to > whether they can do it and whether there is a risk of compatibility. > Therefore, RecordWriter is used here. > > *## External HDFS access* > > Including hadoop configuration and Kerberos related things. > > *## Partition commit* > > Committing a partition is to notify the downstream application that the > partition has finished writing, the partition is ready to be read.The > common way is to add a success file or update metastore. Of course, there > are other ways to notify. We need to provide flexible mechanisms. > As you mentioned, yes, we can extend "StreamingFileSink" for this part. > > *## Batch / Streaming Unification* > > Yes, it is about exactly-once and single commit at the end, There are also > some "bounded" differences. For example, batch can support sorting. In this > way, you can sort by partition, which can reduce the number of writers > written at the same time. Dynamic partition writing in batch may produce > many unordered partitions. > > [1] https://issues.apache.org/jira/browse/FLINK-10114 > [2] https://issues.apache.org/jira/browse/FLINK-11499 > > Best, > Jingsong Lee > > On Tue, Mar 17, 2020 at 8:00 PM LakeShen <[hidden email]> > wrote: > > > Hi Jingsong , > > > > I am looking forward this feature. Because in some streaming > application,it > > need transfer their messages to hdfs , in order to offline analysis. > > > > Best wishes, > > LakeShen > > > > Stephan Ewen <[hidden email]> 于2020年3月17日周二 下午7:42写道: > > > > > I would really like to see us converging the stack and the > functionality > > > here. > > > Meaning to try and use the same sinks in the Table API as for the > > > DataStream API, and using the same sink for batch and streaming. > > > > > > The StreamingFileSink has a lot of things that can help with that. If > > > possible, it would be nice to extend it (which would help move towards > > the > > > above goal) rather than build a second sink. Building a second sink > leads > > > us further away from unification. > > > > > > I am a bit puzzled by the statement that sinks are primarily for Hive. > > The > > > Table API should not be coupled to Hive, it should be an independent > > > batch/streaming API for many use cases, supporting very well for batch > > and > > > streaming interplay. Supporting Hive is great, but we should not be > > > building this towards Hive, as just yet another Hive runtime. Why "yet > > > another Hive runtime" when what we have a unique streaming engine that > > can > > > do much more? We would drop our own strength and reduce ourselves to a > > > limited subset. > > > > > > Let's build a File Sink that can also support Hive, but can do so much > > > more. For example, efficient streaming file ingestion as materialized > > views > > > from changelogs. > > > > > > > > > *## Writing Files in Streaming* > > > > > > To write files in streaming, I don't see another way than using the > > > streaming file sink. If you want to write files across checkpoints, > > support > > > exactly-once, and support consistent "stop with savepoint", it is not > > > trivial. > > > > > > A part of the complexity comes from the fact that not all targets are > > > actually file systems, and not all have simple semantics for > persistence. > > > S3 for example does not support renames (only copies, which may take a > > lot > > > of time) and it does not support flush/sync of data (the S3 file system > > in > > > Hadoop exposes that but it does not work. flush/sync, followed by a > > > failure, leads to data loss). You need to devise a separate protocol > for > > > that, which is exactly what has already been done and abstracted behind > > the > > > recoverable writers. > > > > > > If you re-engineer that in the, you will end up either missing many > > things > > > (intermediate persistence on different file systems, and atomic commit > in > > > the absence of renames, etc.), or you end up doing something similar as > > the > > > recoverable writers do. > > > > > > > > > *## Atomic Commit in Batch* > > > > > > For batch sinks, it is also desirable to write the data first and then > > > atomically commit it once the job is done. > > > Hadoop has spent a lot of time making this work, see this doc here, > > > specifically the section on 'The "Magic" Committer'. [1] > > > > > > What Flink has built in the RecoverableWriter is in some way an even > > better > > > version of this, because it works without extra files (we pass data > > through > > > checkpoint state) and it supports not only committing once at the end, > > but > > > committing multiple time intermediate parts during checkpoints. > > > > > > Meaning using the recoverable writer mechanism in batch would allow us > to > > > immediately get the efficient atomic commit implementations on file:// > > > hdfs:// and s3://, with a well defined way to implement it also for > other > > > file systems. > > > > > > > > > *## Batch / Streaming Unification* > > > > > > It would be great to start looking at these things in the same way: > > > - streaming (exactly-once): commits files (after finished) at the > next > > > checkpoint > > > - batch: single commit at the end of the job > > > > > > > > > *## DataStream / Table API Stack Unification* > > > > > > Having the same set of capabilities would make it much easier for users > > to > > > understand the system. > > > Especially when it comes to consistent behavior across external > systems. > > > Having a different file sink in Table API and DataStream API means that > > > DataStream can write correctly to S3 while Table API cannot. > > > > > > > > > *## What is missing?* > > > > > > It seems there are some things that get in the way of naturally > > > Can you make a list of what features are missing in the > StreamingFileSink > > > that make it usable for the use cases you have in mind? > > > > > > Best, > > > Stephan > > > > > > [1] > > > > > > > > > https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html > > > > > > > > > On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li <[hidden email]> > > > wrote: > > > > > > > Hi Piotr, > > > > > > > > I am very entangled. > > > > > > > > Let me re-list the table streaming sink requirements: > > > > - In table, maybe 90% sinks are for Hive. The parquet and orc are the > > > most > > > > important formats. Hive provide RecordWriters, it is easy to support > > all > > > > hive formats by using it, and we don't need concern hive version > > > > compatibility too, but it can not work with FSDataOutputStream. > > > > - Hive table maybe use external HDFS. It means, hive has its own > hadoop > > > > configuration. > > > > - In table, partition commit is needed, we can not just move files, > it > > is > > > > important to complete table semantics to update catalog. > > > > > > > > You are right DataStream and Table streaming sink will not be fully > > > > compatible, each with its own set of limitations, quirks and > features. > > > > But if re-using DataStream, batch and streaming also will not be > fully > > > > compatible. Provide a unify experience to batch and streaming is also > > > > important. > > > > > > > > Table and DataStream have different concerns, and they tilt in > > different > > > > directions. > > > > > > > > Of course, it is very good to see a unify implementation to solve > batch > > > > sink and hive things, unify DataStream batch sink and DataStream > > > streaming > > > > sink and Table batch sink and Table streaming sink. > > > > > > > > Le's see what others think. > > > > > > > > Best, > > > > Jingsong Lee > > > > > > > > > > > > On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski <[hidden email]> > > > > wrote: > > > > > > > > > Hi Jingsong, > > > > > > > > > > > First way is reusing Batch sink in FLINK-14254, It has handled > the > > > > > partition and metastore logic well. > > > > > > - unify batch and streaming > > > > > > - Using FileOutputFormat is consistent with FileInputFormat. > > > > > > - Add exactly-once related logic. Just 200+ lines code. > > > > > > - It's natural to support more table features, like partition > > commit, > > > > > auto compact and etc.. > > > > > > > > > > > > Second way is reusing Datastream StreamingFileSink: > > > > > > - unify streaming sink between table and Datastream. > > > > > > - It maybe hard to introduce table related features to > > > > StreamingFileSink. > > > > > > > > > > > > I prefer the first way a little. What do you think? > > > > > > > > > > I would be surprised if adding “exactly-once related logic” is just > > 200 > > > > > lines of code. There are things like multi part file upload to s3 > and > > > > there > > > > > are also some pending features like [1]. I would suggest to > > ask/involve > > > > > Klou in this discussion. > > > > > > > > > > If it’s as easy to support exactly-once streaming with current > batch > > > > sink, > > > > > that begs the question, why do we need to maintain > StreamingFileSink? > > > > > > > > > > The worst possible outcome from my perspective will be, if we have > > > > another > > > > > example of an operator/logic implemented independently both in > > > DataStream > > > > > API and Table API. Because I’m pretty sure they will not be fully > > > > > compatible, each with it’s own set of limitations, quirks and > > features. > > > > > Especially that we have on our long term roadmap and wish list to > > unify > > > > > such kind of operators. > > > > > > > > > > Piotrek > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11499 < > > > > > https://issues.apache.org/jira/browse/FLINK-11499> > > > > > > > > > > > On 16 Mar 2020, at 06:55, Jingsong Li <[hidden email]> > > > wrote: > > > > > > > > > > > > Thanks Jinhai for involving. > > > > > > > > > > > >> we need add 'connector.sink.username' for UserGroupInformation > > when > > > > data > > > > > > is written to HDFS > > > > > > > > > > > > Yes, I am not an expert of HDFS, but it seems we need do this > > "doAs" > > > in > > > > > the > > > > > > code for access external HDFS. I will update document. > > > > > > > > > > > > Best, > > > > > > Jingsong Lee > > > > > > > > > > > > On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li < > > [hidden email] > > > > > > > > > wrote: > > > > > > > > > > > >> Thanks Piotr and Yun for involving. > > > > > >> > > > > > >> Hi Piotr and Yun, for implementation, > > > > > >> > > > > > >> FLINK-14254 [1] introduce batch sink table world, it deals with > > > > > partitions > > > > > >> thing, metastore thing and etc.. And it just reuse > > > Dataset/Datastream > > > > > >> FileInputFormat and FileOutputFormat. Filesystem can not do > > without > > > > > >> FileInputFormat, because it need deal with file things, split > > > things. > > > > > Like > > > > > >> orc and parquet, they need read whole file and have different > > split > > > > > logic. > > > > > >> > > > > > >> So back to file system connector: > > > > > >> - It needs introducing FilesystemTableFactory, > > FilesystemTableSource > > > > and > > > > > >> FilesystemTableSink. > > > > > >> - For sources, reusing Dataset/Datastream FileInputFormats, > there > > > are > > > > no > > > > > >> other interface to finish file reading. > > > > > >> > > > > > >> For file sinks: > > > > > >> - Batch sink use FLINK-14254 > > > > > >> - Streaming sink has two ways. > > > > > >> > > > > > >> First way is reusing Batch sink in FLINK-14254, It has handled > the > > > > > >> partition and metastore logic well. > > > > > >> - unify batch and streaming > > > > > >> - Using FileOutputFormat is consistent with FileInputFormat. > > > > > >> - Add exactly-once related logic. Just 200+ lines code. > > > > > >> - It's natural to support more table features, like partition > > > commit, > > > > > auto > > > > > >> compact and etc.. > > > > > >> > > > > > >> Second way is reusing Datastream StreamingFileSink: > > > > > >> - unify streaming sink between table and Datastream. > > > > > >> - It maybe hard to introduce table related features to > > > > > StreamingFileSink. > > > > > >> > > > > > >> I prefer the first way a little. What do you think? > > > > > >> > > > > > >> Hi Yun, > > > > > >> > > > > > >>> Watermark mechanism might not be enough. > > > > > >> > > > > > >> Watermarks of subtasks are the same in the "snapshotState". > > > > > >> > > > > > >>> we might need to also do some coordination between subtasks. > > > > > >> > > > > > >> Yes, JobMaster is the role to control subtasks. Metastore is a > > very > > > > > >> fragile single point, which can not be accessed by distributed, > so > > > it > > > > is > > > > > >> uniformly accessed by JobMaster. > > > > > >> > > > > > >> [1]https://issues.apache.org/jira/browse/FLINK-14254 > > > > > >> > > > > > >> Best, > > > > > >> Jingsong Lee > > > > > >> > > > > > >> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao <[hidden email]> > > > wrote: > > > > > >> > > > > > >>> Hi, > > > > > >>> > > > > > >>> Very thanks for Jinsong to bring up this discussion! It > > > should > > > > > >>> largely improve the usability after enhancing the FileSystem > > > > connector > > > > > in > > > > > >>> Table. > > > > > >>> > > > > > >>> I have the same question with Piotr. From my side, I > think > > it > > > > > >>> should be better to be able to reuse existing > StreamingFileSink. > > I > > > > > think We > > > > > >>> have began > > > > > >>> enhancing the supported FileFormat (e.g., ORC, Avro...), > > and > > > > > >>> reusing StreamFileSink should be able to avoid repeat work in > the > > > > Table > > > > > >>> library. Besides, > > > > > >>> the bucket concept seems also matches the semantics of > > > > partition. > > > > > >>> > > > > > >>> For the notification of adding partitions, I'm a little > > > > wondering > > > > > >>> that the Watermark mechanism might not be enough since > > > > Bucket/Partition > > > > > >>> might spans > > > > > >>> multiple subtasks. It depends on the level of > notification: > > > if > > > > we > > > > > >>> want to notify for the bucket on each subtask, using watermark > to > > > > > notifying > > > > > >>> each subtask > > > > > >>> should be ok, but if we want to notifying for the whole > > > > > >>> Bucket/Partition, we might need to also do some coordination > > > between > > > > > >>> subtasks. > > > > > >>> > > > > > >>> > > > > > >>> Best, > > > > > >>> Yun > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >>> > > ------------------------------------------------------------------ > > > > > >>> From:Piotr Nowojski <[hidden email]> > > > > > >>> Send Time:2020 Mar. 13 (Fri.) 18:03 > > > > > >>> To:dev <[hidden email]> > > > > > >>> Cc:user <[hidden email]>; user-zh < > > [hidden email] > > > > > > > > > >>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table > > > > > >>> > > > > > >>> Hi, > > > > > >>> > > > > > >>> > > > > > >>> Which actual sinks/sources are you planning to use in this > > feature? > > > > Is > > > > > it about exposing StreamingFileSink in the Table API? Or do you > want > > to > > > > > implement new Sinks/Sources? > > > > > >>> > > > > > >>> Piotrek > > > > > >>> > > > > > >>>> On 13 Mar 2020, at 10:04, jinhai wang <[hidden email]> > > > wrote: > > > > > >>>> > > > > > >>> > > > > > >>>> Thanks for FLIP-115. It is really useful feature for platform > > > > > developers who manage hundreds of Flink to Hive jobs in production. > > > > > >>> > > > > > >>>> I think we need add 'connector.sink.username' for > > > > > UserGroupInformation when data is written to HDFS > > > > > >>>> > > > > > >>>> > > > > > >>>> 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> 写入: > > > > > >>>> > > > > > >>>> Hi everyone, > > > > > >>>> > > > > > >>> > > > > > >>>> I'd like to start a discussion about FLIP-115 Filesystem > > > connector > > > > > in Table > > > > > >>>> [1]. > > > > > >>>> This FLIP will bring: > > > > > >>>> - Introduce Filesystem table factory in table, support > > > > > >>>> csv/parquet/orc/json/avro formats. > > > > > >>>> - Introduce streaming filesystem/hive sink in table > > > > > >>>> > > > > > >>> > > > > > >>>> CC to user mail list, if you have any unmet needs, please > feel > > > > free > > > > > to > > > > > >>>> reply~ > > > > > >>>> > > > > > >>>> Look forward to hearing from you. > > > > > >>>> > > > > > >>>> [1] > > > > > >>>> > > > > > >>> > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > > > > >>>> > > > > > >>>> Best, > > > > > >>>> Jingsong Lee > > > > > >>>> > > > > > >>>> > > > > > >>>> > > > > > >>> > > > > > >>> > > > > > >>> > > > > > >> > > > > > >> -- > > > > > >> Best, Jingsong Lee > > > > > >> > > > > > > > > > > > > > > > > > > -- > > > > > > Best, Jingsong Lee > > > > > > > > > > > > > > > > > > -- > > > > Best, Jingsong Lee > > > > > > > > > > > > -- > Best, Jingsong Lee > |
Hi all,
I also agree with Stephan on this! It has been more than a year now that most of our efforts have had the "unify" / "unification"/ etc either on their title or in their core and this has been the focus of all our resources. By deviating from this now, we only put more stress on other teams in the future. When the users start using a given API, with high probability, they will ask (and it is totally reasonable) consistent behaviour from all the other APIs that ship with Flink. This will eventually lead to having to answer the questions that we now deem as difficult in a future release, when we will have to "unify" again. In addition, Hive integration is definitely a "nice to have" feature but it does not mean that we need to push for 100% compatibility if it is not required. @Jingsong Li if you think that Parquet and Orc are the main formats, we can focus on these and provide good support for them (both reading and writing). For maintainability, I think that given the amount of demand for these formats, it is not going to be a huge problem at least for now. Given the above, I am also leaning towards a solution that aims at extending the StreamingFileSink to efficiently support bulk formats like Parquet and Orc, rather than creating a new sink that locks Hive-dependent usecases to a specific API. Cheers, Kostas On Wed, Mar 18, 2020 at 12:03 PM Stephan Ewen <[hidden email]> wrote: > > >> The FLIP is "Filesystem connector in Table", it's about building up > Flink Table's capabilities. > > That is exactly what worries me. The whole effort is not thinking about > Flink as a whole any more. > This proposal is not trying to build a consistent user experience across > batch and streaming, across Table API, SQL, and DataStream. > > The proposal is building a separate, disconnected ecosystem for the Table > API, specific to batch processing and some limited streaming setups. > Specific to one type of environment (Hive and HDFS). It willingly omits > support for other environments and conflicts with efforts in other > components to unify. > > Supporting common use cases is good, but in my opinion not at the price of > creating a "fractured" project where the approaches in different layers > don't fit together any more. > > > > *## StreamingFileSink not support writer with path* > > > > The FLIP is "Filesystem connector in Table", it's about building up Flink > > Table's capabilities. But I think Hive is important, I see that most users > > use Flink and Spark to write data from Kafka to hive. Streaming writing, I > > see that these two engines are convenient and popular. I mean, Flink is not > > only a hive runtime, but also an important part of offline data warehouse. > > The thing is StreamingFileSink not support hadoop record writers. Yes, we > > can support them one by one. I see the community integrating ORC [1]. But > > it's really not an easy thing. And we have to be careful to maintain > > compatibility. After all, users downstream use other computing engines to > > analyze. > > Yes, exposing "RecoverableFsDataOutputStream" to writers is good to > > subsequent optimization [2]. But there are many cases. It is enough for > > users to generate new files at the checkpoint. They pay more attention to > > whether they can do it and whether there is a risk of compatibility. > > Therefore, RecordWriter is used here. > > > > *## External HDFS access* > > > > Including hadoop configuration and Kerberos related things. > > > > *## Partition commit* > > > > Committing a partition is to notify the downstream application that the > > partition has finished writing, the partition is ready to be read.The > > common way is to add a success file or update metastore. Of course, there > > are other ways to notify. We need to provide flexible mechanisms. > > As you mentioned, yes, we can extend "StreamingFileSink" for this part. > > > > *## Batch / Streaming Unification* > > > > Yes, it is about exactly-once and single commit at the end, There are also > > some "bounded" differences. For example, batch can support sorting. In this > > way, you can sort by partition, which can reduce the number of writers > > written at the same time. Dynamic partition writing in batch may produce > > many unordered partitions. > > > > [1] https://issues.apache.org/jira/browse/FLINK-10114 > > [2] https://issues.apache.org/jira/browse/FLINK-11499 > > > > Best, > > Jingsong Lee > > > > On Tue, Mar 17, 2020 at 8:00 PM LakeShen <[hidden email]> > > wrote: > > > > > Hi Jingsong , > > > > > > I am looking forward this feature. Because in some streaming > > application,it > > > need transfer their messages to hdfs , in order to offline analysis. > > > > > > Best wishes, > > > LakeShen > > > > > > Stephan Ewen <[hidden email]> 于2020年3月17日周二 下午7:42写道: > > > > > > > I would really like to see us converging the stack and the > > functionality > > > > here. > > > > Meaning to try and use the same sinks in the Table API as for the > > > > DataStream API, and using the same sink for batch and streaming. > > > > > > > > The StreamingFileSink has a lot of things that can help with that. If > > > > possible, it would be nice to extend it (which would help move towards > > > the > > > > above goal) rather than build a second sink. Building a second sink > > leads > > > > us further away from unification. > > > > > > > > I am a bit puzzled by the statement that sinks are primarily for Hive. > > > The > > > > Table API should not be coupled to Hive, it should be an independent > > > > batch/streaming API for many use cases, supporting very well for batch > > > and > > > > streaming interplay. Supporting Hive is great, but we should not be > > > > building this towards Hive, as just yet another Hive runtime. Why "yet > > > > another Hive runtime" when what we have a unique streaming engine that > > > can > > > > do much more? We would drop our own strength and reduce ourselves to a > > > > limited subset. > > > > > > > > Let's build a File Sink that can also support Hive, but can do so much > > > > more. For example, efficient streaming file ingestion as materialized > > > views > > > > from changelogs. > > > > > > > > > > > > *## Writing Files in Streaming* > > > > > > > > To write files in streaming, I don't see another way than using the > > > > streaming file sink. If you want to write files across checkpoints, > > > support > > > > exactly-once, and support consistent "stop with savepoint", it is not > > > > trivial. > > > > > > > > A part of the complexity comes from the fact that not all targets are > > > > actually file systems, and not all have simple semantics for > > persistence. > > > > S3 for example does not support renames (only copies, which may take a > > > lot > > > > of time) and it does not support flush/sync of data (the S3 file system > > > in > > > > Hadoop exposes that but it does not work. flush/sync, followed by a > > > > failure, leads to data loss). You need to devise a separate protocol > > for > > > > that, which is exactly what has already been done and abstracted behind > > > the > > > > recoverable writers. > > > > > > > > If you re-engineer that in the, you will end up either missing many > > > things > > > > (intermediate persistence on different file systems, and atomic commit > > in > > > > the absence of renames, etc.), or you end up doing something similar as > > > the > > > > recoverable writers do. > > > > > > > > > > > > *## Atomic Commit in Batch* > > > > > > > > For batch sinks, it is also desirable to write the data first and then > > > > atomically commit it once the job is done. > > > > Hadoop has spent a lot of time making this work, see this doc here, > > > > specifically the section on 'The "Magic" Committer'. [1] > > > > > > > > What Flink has built in the RecoverableWriter is in some way an even > > > better > > > > version of this, because it works without extra files (we pass data > > > through > > > > checkpoint state) and it supports not only committing once at the end, > > > but > > > > committing multiple time intermediate parts during checkpoints. > > > > > > > > Meaning using the recoverable writer mechanism in batch would allow us > > to > > > > immediately get the efficient atomic commit implementations on file:// > > > > hdfs:// and s3://, with a well defined way to implement it also for > > other > > > > file systems. > > > > > > > > > > > > *## Batch / Streaming Unification* > > > > > > > > It would be great to start looking at these things in the same way: > > > > - streaming (exactly-once): commits files (after finished) at the > > next > > > > checkpoint > > > > - batch: single commit at the end of the job > > > > > > > > > > > > *## DataStream / Table API Stack Unification* > > > > > > > > Having the same set of capabilities would make it much easier for users > > > to > > > > understand the system. > > > > Especially when it comes to consistent behavior across external > > systems. > > > > Having a different file sink in Table API and DataStream API means that > > > > DataStream can write correctly to S3 while Table API cannot. > > > > > > > > > > > > *## What is missing?* > > > > > > > > It seems there are some things that get in the way of naturally > > > > Can you make a list of what features are missing in the > > StreamingFileSink > > > > that make it usable for the use cases you have in mind? > > > > > > > > Best, > > > > Stephan > > > > > > > > [1] > > > > > > > > > > > > > https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html > > > > > > > > > > > > On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li <[hidden email]> > > > > wrote: > > > > > > > > > Hi Piotr, > > > > > > > > > > I am very entangled. > > > > > > > > > > Let me re-list the table streaming sink requirements: > > > > > - In table, maybe 90% sinks are for Hive. The parquet and orc are the > > > > most > > > > > important formats. Hive provide RecordWriters, it is easy to support > > > all > > > > > hive formats by using it, and we don't need concern hive version > > > > > compatibility too, but it can not work with FSDataOutputStream. > > > > > - Hive table maybe use external HDFS. It means, hive has its own > > hadoop > > > > > configuration. > > > > > - In table, partition commit is needed, we can not just move files, > > it > > > is > > > > > important to complete table semantics to update catalog. > > > > > > > > > > You are right DataStream and Table streaming sink will not be fully > > > > > compatible, each with its own set of limitations, quirks and > > features. > > > > > But if re-using DataStream, batch and streaming also will not be > > fully > > > > > compatible. Provide a unify experience to batch and streaming is also > > > > > important. > > > > > > > > > > Table and DataStream have different concerns, and they tilt in > > > different > > > > > directions. > > > > > > > > > > Of course, it is very good to see a unify implementation to solve > > batch > > > > > sink and hive things, unify DataStream batch sink and DataStream > > > > streaming > > > > > sink and Table batch sink and Table streaming sink. > > > > > > > > > > Le's see what others think. > > > > > > > > > > Best, > > > > > Jingsong Lee > > > > > > > > > > > > > > > On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski <[hidden email]> > > > > > wrote: > > > > > > > > > > > Hi Jingsong, > > > > > > > > > > > > > First way is reusing Batch sink in FLINK-14254, It has handled > > the > > > > > > partition and metastore logic well. > > > > > > > - unify batch and streaming > > > > > > > - Using FileOutputFormat is consistent with FileInputFormat. > > > > > > > - Add exactly-once related logic. Just 200+ lines code. > > > > > > > - It's natural to support more table features, like partition > > > commit, > > > > > > auto compact and etc.. > > > > > > > > > > > > > > Second way is reusing Datastream StreamingFileSink: > > > > > > > - unify streaming sink between table and Datastream. > > > > > > > - It maybe hard to introduce table related features to > > > > > StreamingFileSink. > > > > > > > > > > > > > > I prefer the first way a little. What do you think? > > > > > > > > > > > > I would be surprised if adding “exactly-once related logic” is just > > > 200 > > > > > > lines of code. There are things like multi part file upload to s3 > > and > > > > > there > > > > > > are also some pending features like [1]. I would suggest to > > > ask/involve > > > > > > Klou in this discussion. > > > > > > > > > > > > If it’s as easy to support exactly-once streaming with current > > batch > > > > > sink, > > > > > > that begs the question, why do we need to maintain > > StreamingFileSink? > > > > > > > > > > > > The worst possible outcome from my perspective will be, if we have > > > > > another > > > > > > example of an operator/logic implemented independently both in > > > > DataStream > > > > > > API and Table API. Because I’m pretty sure they will not be fully > > > > > > compatible, each with it’s own set of limitations, quirks and > > > features. > > > > > > Especially that we have on our long term roadmap and wish list to > > > unify > > > > > > such kind of operators. > > > > > > > > > > > > Piotrek > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11499 < > > > > > > https://issues.apache.org/jira/browse/FLINK-11499> > > > > > > > > > > > > > On 16 Mar 2020, at 06:55, Jingsong Li <[hidden email]> > > > > wrote: > > > > > > > > > > > > > > Thanks Jinhai for involving. > > > > > > > > > > > > > >> we need add 'connector.sink.username' for UserGroupInformation > > > when > > > > > data > > > > > > > is written to HDFS > > > > > > > > > > > > > > Yes, I am not an expert of HDFS, but it seems we need do this > > > "doAs" > > > > in > > > > > > the > > > > > > > code for access external HDFS. I will update document. > > > > > > > > > > > > > > Best, > > > > > > > Jingsong Lee > > > > > > > > > > > > > > On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li < > > > [hidden email] > > > > > > > > > > > wrote: > > > > > > > > > > > > > >> Thanks Piotr and Yun for involving. > > > > > > >> > > > > > > >> Hi Piotr and Yun, for implementation, > > > > > > >> > > > > > > >> FLINK-14254 [1] introduce batch sink table world, it deals with > > > > > > partitions > > > > > > >> thing, metastore thing and etc.. And it just reuse > > > > Dataset/Datastream > > > > > > >> FileInputFormat and FileOutputFormat. Filesystem can not do > > > without > > > > > > >> FileInputFormat, because it need deal with file things, split > > > > things. > > > > > > Like > > > > > > >> orc and parquet, they need read whole file and have different > > > split > > > > > > logic. > > > > > > >> > > > > > > >> So back to file system connector: > > > > > > >> - It needs introducing FilesystemTableFactory, > > > FilesystemTableSource > > > > > and > > > > > > >> FilesystemTableSink. > > > > > > >> - For sources, reusing Dataset/Datastream FileInputFormats, > > there > > > > are > > > > > no > > > > > > >> other interface to finish file reading. > > > > > > >> > > > > > > >> For file sinks: > > > > > > >> - Batch sink use FLINK-14254 > > > > > > >> - Streaming sink has two ways. > > > > > > >> > > > > > > >> First way is reusing Batch sink in FLINK-14254, It has handled > > the > > > > > > >> partition and metastore logic well. > > > > > > >> - unify batch and streaming > > > > > > >> - Using FileOutputFormat is consistent with FileInputFormat. > > > > > > >> - Add exactly-once related logic. Just 200+ lines code. > > > > > > >> - It's natural to support more table features, like partition > > > > commit, > > > > > > auto > > > > > > >> compact and etc.. > > > > > > >> > > > > > > >> Second way is reusing Datastream StreamingFileSink: > > > > > > >> - unify streaming sink between table and Datastream. > > > > > > >> - It maybe hard to introduce table related features to > > > > > > StreamingFileSink. > > > > > > >> > > > > > > >> I prefer the first way a little. What do you think? > > > > > > >> > > > > > > >> Hi Yun, > > > > > > >> > > > > > > >>> Watermark mechanism might not be enough. > > > > > > >> > > > > > > >> Watermarks of subtasks are the same in the "snapshotState". > > > > > > >> > > > > > > >>> we might need to also do some coordination between subtasks. > > > > > > >> > > > > > > >> Yes, JobMaster is the role to control subtasks. Metastore is a > > > very > > > > > > >> fragile single point, which can not be accessed by distributed, > > so > > > > it > > > > > is > > > > > > >> uniformly accessed by JobMaster. > > > > > > >> > > > > > > >> [1]https://issues.apache.org/jira/browse/FLINK-14254 > > > > > > >> > > > > > > >> Best, > > > > > > >> Jingsong Lee > > > > > > >> > > > > > > >> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao <[hidden email]> > > > > wrote: > > > > > > >> > > > > > > >>> Hi, > > > > > > >>> > > > > > > >>> Very thanks for Jinsong to bring up this discussion! It > > > > should > > > > > > >>> largely improve the usability after enhancing the FileSystem > > > > > connector > > > > > > in > > > > > > >>> Table. > > > > > > >>> > > > > > > >>> I have the same question with Piotr. From my side, I > > think > > > it > > > > > > >>> should be better to be able to reuse existing > > StreamingFileSink. > > > I > > > > > > think We > > > > > > >>> have began > > > > > > >>> enhancing the supported FileFormat (e.g., ORC, Avro...), > > > and > > > > > > >>> reusing StreamFileSink should be able to avoid repeat work in > > the > > > > > Table > > > > > > >>> library. Besides, > > > > > > >>> the bucket concept seems also matches the semantics of > > > > > partition. > > > > > > >>> > > > > > > >>> For the notification of adding partitions, I'm a little > > > > > wondering > > > > > > >>> that the Watermark mechanism might not be enough since > > > > > Bucket/Partition > > > > > > >>> might spans > > > > > > >>> multiple subtasks. It depends on the level of > > notification: > > > > if > > > > > we > > > > > > >>> want to notify for the bucket on each subtask, using watermark > > to > > > > > > notifying > > > > > > >>> each subtask > > > > > > >>> should be ok, but if we want to notifying for the whole > > > > > > >>> Bucket/Partition, we might need to also do some coordination > > > > between > > > > > > >>> subtasks. > > > > > > >>> > > > > > > >>> > > > > > > >>> Best, > > > > > > >>> Yun > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > ------------------------------------------------------------------ > > > > > > >>> From:Piotr Nowojski <[hidden email]> > > > > > > >>> Send Time:2020 Mar. 13 (Fri.) 18:03 > > > > > > >>> To:dev <[hidden email]> > > > > > > >>> Cc:user <[hidden email]>; user-zh < > > > [hidden email] > > > > > > > > > > > >>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in Table > > > > > > >>> > > > > > > >>> Hi, > > > > > > >>> > > > > > > >>> > > > > > > >>> Which actual sinks/sources are you planning to use in this > > > feature? > > > > > Is > > > > > > it about exposing StreamingFileSink in the Table API? Or do you > > want > > > to > > > > > > implement new Sinks/Sources? > > > > > > >>> > > > > > > >>> Piotrek > > > > > > >>> > > > > > > >>>> On 13 Mar 2020, at 10:04, jinhai wang <[hidden email]> > > > > wrote: > > > > > > >>>> > > > > > > >>> > > > > > > >>>> Thanks for FLIP-115. It is really useful feature for platform > > > > > > developers who manage hundreds of Flink to Hive jobs in production. > > > > > > >>> > > > > > > >>>> I think we need add 'connector.sink.username' for > > > > > > UserGroupInformation when data is written to HDFS > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> 写入: > > > > > > >>>> > > > > > > >>>> Hi everyone, > > > > > > >>>> > > > > > > >>> > > > > > > >>>> I'd like to start a discussion about FLIP-115 Filesystem > > > > connector > > > > > > in Table > > > > > > >>>> [1]. > > > > > > >>>> This FLIP will bring: > > > > > > >>>> - Introduce Filesystem table factory in table, support > > > > > > >>>> csv/parquet/orc/json/avro formats. > > > > > > >>>> - Introduce streaming filesystem/hive sink in table > > > > > > >>>> > > > > > > >>> > > > > > > >>>> CC to user mail list, if you have any unmet needs, please > > feel > > > > > free > > > > > > to > > > > > > >>>> reply~ > > > > > > >>>> > > > > > > >>>> Look forward to hearing from you. > > > > > > >>>> > > > > > > >>>> [1] > > > > > > >>>> > > > > > > >>> > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > > > > > >>>> > > > > > > >>>> Best, > > > > > > >>>> Jingsong Lee > > > > > > >>>> > > > > > > >>>> > > > > > > >>>> > > > > > > >>> > > > > > > >>> > > > > > > >>> > > > > > > >> > > > > > > >> -- > > > > > > >> Best, Jingsong Lee > > > > > > >> > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > Best, Jingsong Lee > > > > > > > > > > > > > > > > > > > > > > -- > > > > > Best, Jingsong Lee > > > > > > > > > > > > > > > > > > -- > > Best, Jingsong Lee > > |
Hi all,
Thanks for the discuss and feedbacks. I think this FLIP doesn't imply the implementation of such connector yet, it only describes the functionality and expected behaviors from user's perspective. Reusing current StreamingFileSink is definitely one of the possible ways to implement it. Since there are lots of details and I would suggest we can have an offline meeting to discuss the how these could be achieved by extending StremingFileSink, and how much effort we need to put on it. What do you think? Best, Kurt On Wed, Mar 18, 2020 at 7:21 PM Kostas Kloudas <[hidden email]> wrote: > Hi all, > > I also agree with Stephan on this! > > It has been more than a year now that most of our efforts have had the > "unify" / "unification"/ etc either on their title or in their core > and this has been the focus of all our resources. By deviating from > this now, we only put more stress on other teams in the future. When > the users start using a given API, with high probability, they will > ask (and it is totally reasonable) consistent behaviour from all the > other APIs that ship with Flink. This will eventually lead to having > to answer the questions that we now deem as difficult in a future > release, when we will have to "unify" again. > > In addition, Hive integration is definitely a "nice to have" feature > but it does not mean that we need to push for 100% compatibility if it > is not required. > @Jingsong Li if you think that Parquet and Orc are the main formats, > we can focus on these and provide good support for them (both reading > and writing). For maintainability, I think that given the amount of > demand for these formats, it is not going to be a huge problem at > least for now. > > Given the above, I am also leaning towards a solution that aims at > extending the StreamingFileSink to efficiently support bulk formats > like Parquet and Orc, rather than creating a new sink that locks > Hive-dependent usecases to a specific API. > > Cheers, > Kostas > > > > > > On Wed, Mar 18, 2020 at 12:03 PM Stephan Ewen <[hidden email]> wrote: > > > > >> The FLIP is "Filesystem connector in Table", it's about building up > > Flink Table's capabilities. > > > > That is exactly what worries me. The whole effort is not thinking about > > Flink as a whole any more. > > This proposal is not trying to build a consistent user experience across > > batch and streaming, across Table API, SQL, and DataStream. > > > > The proposal is building a separate, disconnected ecosystem for the Table > > API, specific to batch processing and some limited streaming setups. > > Specific to one type of environment (Hive and HDFS). It willingly omits > > support for other environments and conflicts with efforts in other > > components to unify. > > > > Supporting common use cases is good, but in my opinion not at the price > of > > creating a "fractured" project where the approaches in different layers > > don't fit together any more. > > > > > > > *## StreamingFileSink not support writer with path* > > > > > > The FLIP is "Filesystem connector in Table", it's about building up > Flink > > > Table's capabilities. But I think Hive is important, I see that most > users > > > use Flink and Spark to write data from Kafka to hive. Streaming > writing, I > > > see that these two engines are convenient and popular. I mean, Flink > is not > > > only a hive runtime, but also an important part of offline data > warehouse. > > > The thing is StreamingFileSink not support hadoop record writers. Yes, > we > > > can support them one by one. I see the community integrating ORC [1]. > But > > > it's really not an easy thing. And we have to be careful to maintain > > > compatibility. After all, users downstream use other computing engines > to > > > analyze. > > > Yes, exposing "RecoverableFsDataOutputStream" to writers is good to > > > subsequent optimization [2]. But there are many cases. It is enough for > > > users to generate new files at the checkpoint. They pay more attention > to > > > whether they can do it and whether there is a risk of compatibility. > > > Therefore, RecordWriter is used here. > > > > > > *## External HDFS access* > > > > > > Including hadoop configuration and Kerberos related things. > > > > > > *## Partition commit* > > > > > > Committing a partition is to notify the downstream application that the > > > partition has finished writing, the partition is ready to be read.The > > > common way is to add a success file or update metastore. Of course, > there > > > are other ways to notify. We need to provide flexible mechanisms. > > > As you mentioned, yes, we can extend "StreamingFileSink" for this part. > > > > > > *## Batch / Streaming Unification* > > > > > > Yes, it is about exactly-once and single commit at the end, There are > also > > > some "bounded" differences. For example, batch can support sorting. In > this > > > way, you can sort by partition, which can reduce the number of writers > > > written at the same time. Dynamic partition writing in batch may > produce > > > many unordered partitions. > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-10114 > > > [2] https://issues.apache.org/jira/browse/FLINK-11499 > > > > > > Best, > > > Jingsong Lee > > > > > > On Tue, Mar 17, 2020 at 8:00 PM LakeShen <[hidden email]> > > > wrote: > > > > > > > Hi Jingsong , > > > > > > > > I am looking forward this feature. Because in some streaming > > > application,it > > > > need transfer their messages to hdfs , in order to offline analysis. > > > > > > > > Best wishes, > > > > LakeShen > > > > > > > > Stephan Ewen <[hidden email]> 于2020年3月17日周二 下午7:42写道: > > > > > > > > > I would really like to see us converging the stack and the > > > functionality > > > > > here. > > > > > Meaning to try and use the same sinks in the Table API as for the > > > > > DataStream API, and using the same sink for batch and streaming. > > > > > > > > > > The StreamingFileSink has a lot of things that can help with that. > If > > > > > possible, it would be nice to extend it (which would help move > towards > > > > the > > > > > above goal) rather than build a second sink. Building a second sink > > > leads > > > > > us further away from unification. > > > > > > > > > > I am a bit puzzled by the statement that sinks are primarily for > Hive. > > > > The > > > > > Table API should not be coupled to Hive, it should be an > independent > > > > > batch/streaming API for many use cases, supporting very well for > batch > > > > and > > > > > streaming interplay. Supporting Hive is great, but we should not be > > > > > building this towards Hive, as just yet another Hive runtime. Why > "yet > > > > > another Hive runtime" when what we have a unique streaming engine > that > > > > can > > > > > do much more? We would drop our own strength and reduce ourselves > to a > > > > > limited subset. > > > > > > > > > > Let's build a File Sink that can also support Hive, but can do so > much > > > > > more. For example, efficient streaming file ingestion as > materialized > > > > views > > > > > from changelogs. > > > > > > > > > > > > > > > *## Writing Files in Streaming* > > > > > > > > > > To write files in streaming, I don't see another way than using the > > > > > streaming file sink. If you want to write files across checkpoints, > > > > support > > > > > exactly-once, and support consistent "stop with savepoint", it is > not > > > > > trivial. > > > > > > > > > > A part of the complexity comes from the fact that not all targets > are > > > > > actually file systems, and not all have simple semantics for > > > persistence. > > > > > S3 for example does not support renames (only copies, which may > take a > > > > lot > > > > > of time) and it does not support flush/sync of data (the S3 file > system > > > > in > > > > > Hadoop exposes that but it does not work. flush/sync, followed by a > > > > > failure, leads to data loss). You need to devise a separate > protocol > > > for > > > > > that, which is exactly what has already been done and abstracted > behind > > > > the > > > > > recoverable writers. > > > > > > > > > > If you re-engineer that in the, you will end up either missing many > > > > things > > > > > (intermediate persistence on different file systems, and atomic > commit > > > in > > > > > the absence of renames, etc.), or you end up doing something > similar as > > > > the > > > > > recoverable writers do. > > > > > > > > > > > > > > > *## Atomic Commit in Batch* > > > > > > > > > > For batch sinks, it is also desirable to write the data first and > then > > > > > atomically commit it once the job is done. > > > > > Hadoop has spent a lot of time making this work, see this doc here, > > > > > specifically the section on 'The "Magic" Committer'. [1] > > > > > > > > > > What Flink has built in the RecoverableWriter is in some way an > even > > > > better > > > > > version of this, because it works without extra files (we pass data > > > > through > > > > > checkpoint state) and it supports not only committing once at the > end, > > > > but > > > > > committing multiple time intermediate parts during checkpoints. > > > > > > > > > > Meaning using the recoverable writer mechanism in batch would > allow us > > > to > > > > > immediately get the efficient atomic commit implementations on > file:// > > > > > hdfs:// and s3://, with a well defined way to implement it also for > > > other > > > > > file systems. > > > > > > > > > > > > > > > *## Batch / Streaming Unification* > > > > > > > > > > It would be great to start looking at these things in the same way: > > > > > - streaming (exactly-once): commits files (after finished) at the > > > next > > > > > checkpoint > > > > > - batch: single commit at the end of the job > > > > > > > > > > > > > > > *## DataStream / Table API Stack Unification* > > > > > > > > > > Having the same set of capabilities would make it much easier for > users > > > > to > > > > > understand the system. > > > > > Especially when it comes to consistent behavior across external > > > systems. > > > > > Having a different file sink in Table API and DataStream API means > that > > > > > DataStream can write correctly to S3 while Table API cannot. > > > > > > > > > > > > > > > *## What is missing?* > > > > > > > > > > It seems there are some things that get in the way of naturally > > > > > Can you make a list of what features are missing in the > > > StreamingFileSink > > > > > that make it usable for the use cases you have in mind? > > > > > > > > > > Best, > > > > > Stephan > > > > > > > > > > [1] > > > > > > > > > > > > > > > > > > https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html > > > > > > > > > > > > > > > On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li < > [hidden email]> > > > > > wrote: > > > > > > > > > > > Hi Piotr, > > > > > > > > > > > > I am very entangled. > > > > > > > > > > > > Let me re-list the table streaming sink requirements: > > > > > > - In table, maybe 90% sinks are for Hive. The parquet and orc > are the > > > > > most > > > > > > important formats. Hive provide RecordWriters, it is easy to > support > > > > all > > > > > > hive formats by using it, and we don't need concern hive version > > > > > > compatibility too, but it can not work with FSDataOutputStream. > > > > > > - Hive table maybe use external HDFS. It means, hive has its own > > > hadoop > > > > > > configuration. > > > > > > - In table, partition commit is needed, we can not just move > files, > > > it > > > > is > > > > > > important to complete table semantics to update catalog. > > > > > > > > > > > > You are right DataStream and Table streaming sink will not be > fully > > > > > > compatible, each with its own set of limitations, quirks and > > > features. > > > > > > But if re-using DataStream, batch and streaming also will not be > > > fully > > > > > > compatible. Provide a unify experience to batch and streaming is > also > > > > > > important. > > > > > > > > > > > > Table and DataStream have different concerns, and they tilt in > > > > different > > > > > > directions. > > > > > > > > > > > > Of course, it is very good to see a unify implementation to solve > > > batch > > > > > > sink and hive things, unify DataStream batch sink and DataStream > > > > > streaming > > > > > > sink and Table batch sink and Table streaming sink. > > > > > > > > > > > > Le's see what others think. > > > > > > > > > > > > Best, > > > > > > Jingsong Lee > > > > > > > > > > > > > > > > > > On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski < > [hidden email]> > > > > > > wrote: > > > > > > > > > > > > > Hi Jingsong, > > > > > > > > > > > > > > > First way is reusing Batch sink in FLINK-14254, It has > handled > > > the > > > > > > > partition and metastore logic well. > > > > > > > > - unify batch and streaming > > > > > > > > - Using FileOutputFormat is consistent with FileInputFormat. > > > > > > > > - Add exactly-once related logic. Just 200+ lines code. > > > > > > > > - It's natural to support more table features, like partition > > > > commit, > > > > > > > auto compact and etc.. > > > > > > > > > > > > > > > > Second way is reusing Datastream StreamingFileSink: > > > > > > > > - unify streaming sink between table and Datastream. > > > > > > > > - It maybe hard to introduce table related features to > > > > > > StreamingFileSink. > > > > > > > > > > > > > > > > I prefer the first way a little. What do you think? > > > > > > > > > > > > > > I would be surprised if adding “exactly-once related logic” is > just > > > > 200 > > > > > > > lines of code. There are things like multi part file upload to > s3 > > > and > > > > > > there > > > > > > > are also some pending features like [1]. I would suggest to > > > > ask/involve > > > > > > > Klou in this discussion. > > > > > > > > > > > > > > If it’s as easy to support exactly-once streaming with current > > > batch > > > > > > sink, > > > > > > > that begs the question, why do we need to maintain > > > StreamingFileSink? > > > > > > > > > > > > > > The worst possible outcome from my perspective will be, if we > have > > > > > > another > > > > > > > example of an operator/logic implemented independently both in > > > > > DataStream > > > > > > > API and Table API. Because I’m pretty sure they will not be > fully > > > > > > > compatible, each with it’s own set of limitations, quirks and > > > > features. > > > > > > > Especially that we have on our long term roadmap and wish list > to > > > > unify > > > > > > > such kind of operators. > > > > > > > > > > > > > > Piotrek > > > > > > > > > > > > > > [1] https://issues.apache.org/jira/browse/FLINK-11499 < > > > > > > > https://issues.apache.org/jira/browse/FLINK-11499> > > > > > > > > > > > > > > > On 16 Mar 2020, at 06:55, Jingsong Li < > [hidden email]> > > > > > wrote: > > > > > > > > > > > > > > > > Thanks Jinhai for involving. > > > > > > > > > > > > > > > >> we need add 'connector.sink.username' for > UserGroupInformation > > > > when > > > > > > data > > > > > > > > is written to HDFS > > > > > > > > > > > > > > > > Yes, I am not an expert of HDFS, but it seems we need do this > > > > "doAs" > > > > > in > > > > > > > the > > > > > > > > code for access external HDFS. I will update document. > > > > > > > > > > > > > > > > Best, > > > > > > > > Jingsong Lee > > > > > > > > > > > > > > > > On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li < > > > > [hidden email] > > > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > >> Thanks Piotr and Yun for involving. > > > > > > > >> > > > > > > > >> Hi Piotr and Yun, for implementation, > > > > > > > >> > > > > > > > >> FLINK-14254 [1] introduce batch sink table world, it deals > with > > > > > > > partitions > > > > > > > >> thing, metastore thing and etc.. And it just reuse > > > > > Dataset/Datastream > > > > > > > >> FileInputFormat and FileOutputFormat. Filesystem can not do > > > > without > > > > > > > >> FileInputFormat, because it need deal with file things, > split > > > > > things. > > > > > > > Like > > > > > > > >> orc and parquet, they need read whole file and have > different > > > > split > > > > > > > logic. > > > > > > > >> > > > > > > > >> So back to file system connector: > > > > > > > >> - It needs introducing FilesystemTableFactory, > > > > FilesystemTableSource > > > > > > and > > > > > > > >> FilesystemTableSink. > > > > > > > >> - For sources, reusing Dataset/Datastream FileInputFormats, > > > there > > > > > are > > > > > > no > > > > > > > >> other interface to finish file reading. > > > > > > > >> > > > > > > > >> For file sinks: > > > > > > > >> - Batch sink use FLINK-14254 > > > > > > > >> - Streaming sink has two ways. > > > > > > > >> > > > > > > > >> First way is reusing Batch sink in FLINK-14254, It has > handled > > > the > > > > > > > >> partition and metastore logic well. > > > > > > > >> - unify batch and streaming > > > > > > > >> - Using FileOutputFormat is consistent with FileInputFormat. > > > > > > > >> - Add exactly-once related logic. Just 200+ lines code. > > > > > > > >> - It's natural to support more table features, like > partition > > > > > commit, > > > > > > > auto > > > > > > > >> compact and etc.. > > > > > > > >> > > > > > > > >> Second way is reusing Datastream StreamingFileSink: > > > > > > > >> - unify streaming sink between table and Datastream. > > > > > > > >> - It maybe hard to introduce table related features to > > > > > > > StreamingFileSink. > > > > > > > >> > > > > > > > >> I prefer the first way a little. What do you think? > > > > > > > >> > > > > > > > >> Hi Yun, > > > > > > > >> > > > > > > > >>> Watermark mechanism might not be enough. > > > > > > > >> > > > > > > > >> Watermarks of subtasks are the same in the "snapshotState". > > > > > > > >> > > > > > > > >>> we might need to also do some coordination between > subtasks. > > > > > > > >> > > > > > > > >> Yes, JobMaster is the role to control subtasks. Metastore > is a > > > > very > > > > > > > >> fragile single point, which can not be accessed by > distributed, > > > so > > > > > it > > > > > > is > > > > > > > >> uniformly accessed by JobMaster. > > > > > > > >> > > > > > > > >> [1]https://issues.apache.org/jira/browse/FLINK-14254 > > > > > > > >> > > > > > > > >> Best, > > > > > > > >> Jingsong Lee > > > > > > > >> > > > > > > > >> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao < > [hidden email]> > > > > > wrote: > > > > > > > >> > > > > > > > >>> Hi, > > > > > > > >>> > > > > > > > >>> Very thanks for Jinsong to bring up this discussion! > It > > > > > should > > > > > > > >>> largely improve the usability after enhancing the > FileSystem > > > > > > connector > > > > > > > in > > > > > > > >>> Table. > > > > > > > >>> > > > > > > > >>> I have the same question with Piotr. From my side, I > > > think > > > > it > > > > > > > >>> should be better to be able to reuse existing > > > StreamingFileSink. > > > > I > > > > > > > think We > > > > > > > >>> have began > > > > > > > >>> enhancing the supported FileFormat (e.g., ORC, > Avro...), > > > > and > > > > > > > >>> reusing StreamFileSink should be able to avoid repeat work > in > > > the > > > > > > Table > > > > > > > >>> library. Besides, > > > > > > > >>> the bucket concept seems also matches the semantics > of > > > > > > partition. > > > > > > > >>> > > > > > > > >>> For the notification of adding partitions, I'm a > little > > > > > > wondering > > > > > > > >>> that the Watermark mechanism might not be enough since > > > > > > Bucket/Partition > > > > > > > >>> might spans > > > > > > > >>> multiple subtasks. It depends on the level of > > > notification: > > > > > if > > > > > > we > > > > > > > >>> want to notify for the bucket on each subtask, using > watermark > > > to > > > > > > > notifying > > > > > > > >>> each subtask > > > > > > > >>> should be ok, but if we want to notifying for the > whole > > > > > > > >>> Bucket/Partition, we might need to also do some > coordination > > > > > between > > > > > > > >>> subtasks. > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> Best, > > > > > > > >>> Yun > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > ------------------------------------------------------------------ > > > > > > > >>> From:Piotr Nowojski <[hidden email]> > > > > > > > >>> Send Time:2020 Mar. 13 (Fri.) 18:03 > > > > > > > >>> To:dev <[hidden email]> > > > > > > > >>> Cc:user <[hidden email]>; user-zh < > > > > [hidden email] > > > > > > > > > > > > > >>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in > Table > > > > > > > >>> > > > > > > > >>> Hi, > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> Which actual sinks/sources are you planning to use in this > > > > feature? > > > > > > Is > > > > > > > it about exposing StreamingFileSink in the Table API? Or do you > > > want > > > > to > > > > > > > implement new Sinks/Sources? > > > > > > > >>> > > > > > > > >>> Piotrek > > > > > > > >>> > > > > > > > >>>> On 13 Mar 2020, at 10:04, jinhai wang < > [hidden email]> > > > > > wrote: > > > > > > > >>>> > > > > > > > >>> > > > > > > > >>>> Thanks for FLIP-115. It is really useful feature for > platform > > > > > > > developers who manage hundreds of Flink to Hive jobs in > production. > > > > > > > >>> > > > > > > > >>>> I think we need add 'connector.sink.username' for > > > > > > > UserGroupInformation when data is written to HDFS > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> > 写入: > > > > > > > >>>> > > > > > > > >>>> Hi everyone, > > > > > > > >>>> > > > > > > > >>> > > > > > > > >>>> I'd like to start a discussion about FLIP-115 Filesystem > > > > > connector > > > > > > > in Table > > > > > > > >>>> [1]. > > > > > > > >>>> This FLIP will bring: > > > > > > > >>>> - Introduce Filesystem table factory in table, support > > > > > > > >>>> csv/parquet/orc/json/avro formats. > > > > > > > >>>> - Introduce streaming filesystem/hive sink in table > > > > > > > >>>> > > > > > > > >>> > > > > > > > >>>> CC to user mail list, if you have any unmet needs, > please > > > feel > > > > > > free > > > > > > > to > > > > > > > >>>> reply~ > > > > > > > >>>> > > > > > > > >>>> Look forward to hearing from you. > > > > > > > >>>> > > > > > > > >>>> [1] > > > > > > > >>>> > > > > > > > >>> > > > > > > > > > > > > > > > > > > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > > > > > > >>>> > > > > > > > >>>> Best, > > > > > > > >>>> Jingsong Lee > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >>> > > > > > > > >> > > > > > > > >> -- > > > > > > > >> Best, Jingsong Lee > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > > > Best, Jingsong Lee > > > > > > > > > > > > > > > > > > > > > > > > > > -- > > > > > > Best, Jingsong Lee > > > > > > > > > > > > > > > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > |
Hi Kurt,
+1 for having some offline discussion on this topic. But I think the question about using StreamingFileSink or implementing subset of it’s feature from scratch is quite fundamental design decision, with impact on the behaviour of Public API, so I wouldn’t discard it as technical detail and should be included as part of the FLIP (I know It could be argued in opposite direction). Piotrek > On 18 Mar 2020, at 13:55, Kurt Young <[hidden email]> wrote: > > Hi all, > > Thanks for the discuss and feedbacks. I think this FLIP doesn't imply the > implementation > of such connector yet, it only describes the functionality and expected > behaviors from user's > perspective. Reusing current StreamingFileSink is definitely one of the > possible ways to > implement it. Since there are lots of details and I would suggest we can > have an offline meeting > to discuss the how these could be achieved by extending StremingFileSink, > and how much > effort we need to put on it. What do you think? > > Best, > Kurt > > > On Wed, Mar 18, 2020 at 7:21 PM Kostas Kloudas <[hidden email]> wrote: > >> Hi all, >> >> I also agree with Stephan on this! >> >> It has been more than a year now that most of our efforts have had the >> "unify" / "unification"/ etc either on their title or in their core >> and this has been the focus of all our resources. By deviating from >> this now, we only put more stress on other teams in the future. When >> the users start using a given API, with high probability, they will >> ask (and it is totally reasonable) consistent behaviour from all the >> other APIs that ship with Flink. This will eventually lead to having >> to answer the questions that we now deem as difficult in a future >> release, when we will have to "unify" again. >> >> In addition, Hive integration is definitely a "nice to have" feature >> but it does not mean that we need to push for 100% compatibility if it >> is not required. >> @Jingsong Li if you think that Parquet and Orc are the main formats, >> we can focus on these and provide good support for them (both reading >> and writing). For maintainability, I think that given the amount of >> demand for these formats, it is not going to be a huge problem at >> least for now. >> >> Given the above, I am also leaning towards a solution that aims at >> extending the StreamingFileSink to efficiently support bulk formats >> like Parquet and Orc, rather than creating a new sink that locks >> Hive-dependent usecases to a specific API. >> >> Cheers, >> Kostas >> >> >> >> >> >> On Wed, Mar 18, 2020 at 12:03 PM Stephan Ewen <[hidden email]> wrote: >>> >>>>> The FLIP is "Filesystem connector in Table", it's about building up >>> Flink Table's capabilities. >>> >>> That is exactly what worries me. The whole effort is not thinking about >>> Flink as a whole any more. >>> This proposal is not trying to build a consistent user experience across >>> batch and streaming, across Table API, SQL, and DataStream. >>> >>> The proposal is building a separate, disconnected ecosystem for the Table >>> API, specific to batch processing and some limited streaming setups. >>> Specific to one type of environment (Hive and HDFS). It willingly omits >>> support for other environments and conflicts with efforts in other >>> components to unify. >>> >>> Supporting common use cases is good, but in my opinion not at the price >> of >>> creating a "fractured" project where the approaches in different layers >>> don't fit together any more. >>> >>> >>>> *## StreamingFileSink not support writer with path* >>>> >>>> The FLIP is "Filesystem connector in Table", it's about building up >> Flink >>>> Table's capabilities. But I think Hive is important, I see that most >> users >>>> use Flink and Spark to write data from Kafka to hive. Streaming >> writing, I >>>> see that these two engines are convenient and popular. I mean, Flink >> is not >>>> only a hive runtime, but also an important part of offline data >> warehouse. >>>> The thing is StreamingFileSink not support hadoop record writers. Yes, >> we >>>> can support them one by one. I see the community integrating ORC [1]. >> But >>>> it's really not an easy thing. And we have to be careful to maintain >>>> compatibility. After all, users downstream use other computing engines >> to >>>> analyze. >>>> Yes, exposing "RecoverableFsDataOutputStream" to writers is good to >>>> subsequent optimization [2]. But there are many cases. It is enough for >>>> users to generate new files at the checkpoint. They pay more attention >> to >>>> whether they can do it and whether there is a risk of compatibility. >>>> Therefore, RecordWriter is used here. >>>> >>>> *## External HDFS access* >>>> >>>> Including hadoop configuration and Kerberos related things. >>>> >>>> *## Partition commit* >>>> >>>> Committing a partition is to notify the downstream application that the >>>> partition has finished writing, the partition is ready to be read.The >>>> common way is to add a success file or update metastore. Of course, >> there >>>> are other ways to notify. We need to provide flexible mechanisms. >>>> As you mentioned, yes, we can extend "StreamingFileSink" for this part. >>>> >>>> *## Batch / Streaming Unification* >>>> >>>> Yes, it is about exactly-once and single commit at the end, There are >> also >>>> some "bounded" differences. For example, batch can support sorting. In >> this >>>> way, you can sort by partition, which can reduce the number of writers >>>> written at the same time. Dynamic partition writing in batch may >> produce >>>> many unordered partitions. >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-10114 >>>> [2] https://issues.apache.org/jira/browse/FLINK-11499 >>>> >>>> Best, >>>> Jingsong Lee >>>> >>>> On Tue, Mar 17, 2020 at 8:00 PM LakeShen <[hidden email]> >>>> wrote: >>>> >>>>> Hi Jingsong , >>>>> >>>>> I am looking forward this feature. Because in some streaming >>>> application,it >>>>> need transfer their messages to hdfs , in order to offline analysis. >>>>> >>>>> Best wishes, >>>>> LakeShen >>>>> >>>>> Stephan Ewen <[hidden email]> 于2020年3月17日周二 下午7:42写道: >>>>> >>>>>> I would really like to see us converging the stack and the >>>> functionality >>>>>> here. >>>>>> Meaning to try and use the same sinks in the Table API as for the >>>>>> DataStream API, and using the same sink for batch and streaming. >>>>>> >>>>>> The StreamingFileSink has a lot of things that can help with that. >> If >>>>>> possible, it would be nice to extend it (which would help move >> towards >>>>> the >>>>>> above goal) rather than build a second sink. Building a second sink >>>> leads >>>>>> us further away from unification. >>>>>> >>>>>> I am a bit puzzled by the statement that sinks are primarily for >> Hive. >>>>> The >>>>>> Table API should not be coupled to Hive, it should be an >> independent >>>>>> batch/streaming API for many use cases, supporting very well for >> batch >>>>> and >>>>>> streaming interplay. Supporting Hive is great, but we should not be >>>>>> building this towards Hive, as just yet another Hive runtime. Why >> "yet >>>>>> another Hive runtime" when what we have a unique streaming engine >> that >>>>> can >>>>>> do much more? We would drop our own strength and reduce ourselves >> to a >>>>>> limited subset. >>>>>> >>>>>> Let's build a File Sink that can also support Hive, but can do so >> much >>>>>> more. For example, efficient streaming file ingestion as >> materialized >>>>> views >>>>>> from changelogs. >>>>>> >>>>>> >>>>>> *## Writing Files in Streaming* >>>>>> >>>>>> To write files in streaming, I don't see another way than using the >>>>>> streaming file sink. If you want to write files across checkpoints, >>>>> support >>>>>> exactly-once, and support consistent "stop with savepoint", it is >> not >>>>>> trivial. >>>>>> >>>>>> A part of the complexity comes from the fact that not all targets >> are >>>>>> actually file systems, and not all have simple semantics for >>>> persistence. >>>>>> S3 for example does not support renames (only copies, which may >> take a >>>>> lot >>>>>> of time) and it does not support flush/sync of data (the S3 file >> system >>>>> in >>>>>> Hadoop exposes that but it does not work. flush/sync, followed by a >>>>>> failure, leads to data loss). You need to devise a separate >> protocol >>>> for >>>>>> that, which is exactly what has already been done and abstracted >> behind >>>>> the >>>>>> recoverable writers. >>>>>> >>>>>> If you re-engineer that in the, you will end up either missing many >>>>> things >>>>>> (intermediate persistence on different file systems, and atomic >> commit >>>> in >>>>>> the absence of renames, etc.), or you end up doing something >> similar as >>>>> the >>>>>> recoverable writers do. >>>>>> >>>>>> >>>>>> *## Atomic Commit in Batch* >>>>>> >>>>>> For batch sinks, it is also desirable to write the data first and >> then >>>>>> atomically commit it once the job is done. >>>>>> Hadoop has spent a lot of time making this work, see this doc here, >>>>>> specifically the section on 'The "Magic" Committer'. [1] >>>>>> >>>>>> What Flink has built in the RecoverableWriter is in some way an >> even >>>>> better >>>>>> version of this, because it works without extra files (we pass data >>>>> through >>>>>> checkpoint state) and it supports not only committing once at the >> end, >>>>> but >>>>>> committing multiple time intermediate parts during checkpoints. >>>>>> >>>>>> Meaning using the recoverable writer mechanism in batch would >> allow us >>>> to >>>>>> immediately get the efficient atomic commit implementations on >> file:// >>>>>> hdfs:// and s3://, with a well defined way to implement it also for >>>> other >>>>>> file systems. >>>>>> >>>>>> >>>>>> *## Batch / Streaming Unification* >>>>>> >>>>>> It would be great to start looking at these things in the same way: >>>>>> - streaming (exactly-once): commits files (after finished) at the >>>> next >>>>>> checkpoint >>>>>> - batch: single commit at the end of the job >>>>>> >>>>>> >>>>>> *## DataStream / Table API Stack Unification* >>>>>> >>>>>> Having the same set of capabilities would make it much easier for >> users >>>>> to >>>>>> understand the system. >>>>>> Especially when it comes to consistent behavior across external >>>> systems. >>>>>> Having a different file sink in Table API and DataStream API means >> that >>>>>> DataStream can write correctly to S3 while Table API cannot. >>>>>> >>>>>> >>>>>> *## What is missing?* >>>>>> >>>>>> It seems there are some things that get in the way of naturally >>>>>> Can you make a list of what features are missing in the >>>> StreamingFileSink >>>>>> that make it usable for the use cases you have in mind? >>>>>> >>>>>> Best, >>>>>> Stephan >>>>>> >>>>>> [1] >>>>>> >>>>>> >>>>> >>>> >> https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html >>>>>> >>>>>> >>>>>> On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li < >> [hidden email]> >>>>>> wrote: >>>>>> >>>>>>> Hi Piotr, >>>>>>> >>>>>>> I am very entangled. >>>>>>> >>>>>>> Let me re-list the table streaming sink requirements: >>>>>>> - In table, maybe 90% sinks are for Hive. The parquet and orc >> are the >>>>>> most >>>>>>> important formats. Hive provide RecordWriters, it is easy to >> support >>>>> all >>>>>>> hive formats by using it, and we don't need concern hive version >>>>>>> compatibility too, but it can not work with FSDataOutputStream. >>>>>>> - Hive table maybe use external HDFS. It means, hive has its own >>>> hadoop >>>>>>> configuration. >>>>>>> - In table, partition commit is needed, we can not just move >> files, >>>> it >>>>> is >>>>>>> important to complete table semantics to update catalog. >>>>>>> >>>>>>> You are right DataStream and Table streaming sink will not be >> fully >>>>>>> compatible, each with its own set of limitations, quirks and >>>> features. >>>>>>> But if re-using DataStream, batch and streaming also will not be >>>> fully >>>>>>> compatible. Provide a unify experience to batch and streaming is >> also >>>>>>> important. >>>>>>> >>>>>>> Table and DataStream have different concerns, and they tilt in >>>>> different >>>>>>> directions. >>>>>>> >>>>>>> Of course, it is very good to see a unify implementation to solve >>>> batch >>>>>>> sink and hive things, unify DataStream batch sink and DataStream >>>>>> streaming >>>>>>> sink and Table batch sink and Table streaming sink. >>>>>>> >>>>>>> Le's see what others think. >>>>>>> >>>>>>> Best, >>>>>>> Jingsong Lee >>>>>>> >>>>>>> >>>>>>> On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski < >> [hidden email]> >>>>>>> wrote: >>>>>>> >>>>>>>> Hi Jingsong, >>>>>>>> >>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has >> handled >>>> the >>>>>>>> partition and metastore logic well. >>>>>>>>> - unify batch and streaming >>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat. >>>>>>>>> - Add exactly-once related logic. Just 200+ lines code. >>>>>>>>> - It's natural to support more table features, like partition >>>>> commit, >>>>>>>> auto compact and etc.. >>>>>>>>> >>>>>>>>> Second way is reusing Datastream StreamingFileSink: >>>>>>>>> - unify streaming sink between table and Datastream. >>>>>>>>> - It maybe hard to introduce table related features to >>>>>>> StreamingFileSink. >>>>>>>>> >>>>>>>>> I prefer the first way a little. What do you think? >>>>>>>> >>>>>>>> I would be surprised if adding “exactly-once related logic” is >> just >>>>> 200 >>>>>>>> lines of code. There are things like multi part file upload to >> s3 >>>> and >>>>>>> there >>>>>>>> are also some pending features like [1]. I would suggest to >>>>> ask/involve >>>>>>>> Klou in this discussion. >>>>>>>> >>>>>>>> If it’s as easy to support exactly-once streaming with current >>>> batch >>>>>>> sink, >>>>>>>> that begs the question, why do we need to maintain >>>> StreamingFileSink? >>>>>>>> >>>>>>>> The worst possible outcome from my perspective will be, if we >> have >>>>>>> another >>>>>>>> example of an operator/logic implemented independently both in >>>>>> DataStream >>>>>>>> API and Table API. Because I’m pretty sure they will not be >> fully >>>>>>>> compatible, each with it’s own set of limitations, quirks and >>>>> features. >>>>>>>> Especially that we have on our long term roadmap and wish list >> to >>>>> unify >>>>>>>> such kind of operators. >>>>>>>> >>>>>>>> Piotrek >>>>>>>> >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11499 < >>>>>>>> https://issues.apache.org/jira/browse/FLINK-11499> >>>>>>>> >>>>>>>>> On 16 Mar 2020, at 06:55, Jingsong Li < >> [hidden email]> >>>>>> wrote: >>>>>>>>> >>>>>>>>> Thanks Jinhai for involving. >>>>>>>>> >>>>>>>>>> we need add 'connector.sink.username' for >> UserGroupInformation >>>>> when >>>>>>> data >>>>>>>>> is written to HDFS >>>>>>>>> >>>>>>>>> Yes, I am not an expert of HDFS, but it seems we need do this >>>>> "doAs" >>>>>> in >>>>>>>> the >>>>>>>>> code for access external HDFS. I will update document. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Jingsong Lee >>>>>>>>> >>>>>>>>> On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li < >>>>> [hidden email] >>>>>>> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Thanks Piotr and Yun for involving. >>>>>>>>>> >>>>>>>>>> Hi Piotr and Yun, for implementation, >>>>>>>>>> >>>>>>>>>> FLINK-14254 [1] introduce batch sink table world, it deals >> with >>>>>>>> partitions >>>>>>>>>> thing, metastore thing and etc.. And it just reuse >>>>>> Dataset/Datastream >>>>>>>>>> FileInputFormat and FileOutputFormat. Filesystem can not do >>>>> without >>>>>>>>>> FileInputFormat, because it need deal with file things, >> split >>>>>> things. >>>>>>>> Like >>>>>>>>>> orc and parquet, they need read whole file and have >> different >>>>> split >>>>>>>> logic. >>>>>>>>>> >>>>>>>>>> So back to file system connector: >>>>>>>>>> - It needs introducing FilesystemTableFactory, >>>>> FilesystemTableSource >>>>>>> and >>>>>>>>>> FilesystemTableSink. >>>>>>>>>> - For sources, reusing Dataset/Datastream FileInputFormats, >>>> there >>>>>> are >>>>>>> no >>>>>>>>>> other interface to finish file reading. >>>>>>>>>> >>>>>>>>>> For file sinks: >>>>>>>>>> - Batch sink use FLINK-14254 >>>>>>>>>> - Streaming sink has two ways. >>>>>>>>>> >>>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has >> handled >>>> the >>>>>>>>>> partition and metastore logic well. >>>>>>>>>> - unify batch and streaming >>>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat. >>>>>>>>>> - Add exactly-once related logic. Just 200+ lines code. >>>>>>>>>> - It's natural to support more table features, like >> partition >>>>>> commit, >>>>>>>> auto >>>>>>>>>> compact and etc.. >>>>>>>>>> >>>>>>>>>> Second way is reusing Datastream StreamingFileSink: >>>>>>>>>> - unify streaming sink between table and Datastream. >>>>>>>>>> - It maybe hard to introduce table related features to >>>>>>>> StreamingFileSink. >>>>>>>>>> >>>>>>>>>> I prefer the first way a little. What do you think? >>>>>>>>>> >>>>>>>>>> Hi Yun, >>>>>>>>>> >>>>>>>>>>> Watermark mechanism might not be enough. >>>>>>>>>> >>>>>>>>>> Watermarks of subtasks are the same in the "snapshotState". >>>>>>>>>> >>>>>>>>>>> we might need to also do some coordination between >> subtasks. >>>>>>>>>> >>>>>>>>>> Yes, JobMaster is the role to control subtasks. Metastore >> is a >>>>> very >>>>>>>>>> fragile single point, which can not be accessed by >> distributed, >>>> so >>>>>> it >>>>>>> is >>>>>>>>>> uniformly accessed by JobMaster. >>>>>>>>>> >>>>>>>>>> [1]https://issues.apache.org/jira/browse/FLINK-14254 >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Jingsong Lee >>>>>>>>>> >>>>>>>>>> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao < >> [hidden email]> >>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> Very thanks for Jinsong to bring up this discussion! >> It >>>>>> should >>>>>>>>>>> largely improve the usability after enhancing the >> FileSystem >>>>>>> connector >>>>>>>> in >>>>>>>>>>> Table. >>>>>>>>>>> >>>>>>>>>>> I have the same question with Piotr. From my side, I >>>> think >>>>> it >>>>>>>>>>> should be better to be able to reuse existing >>>> StreamingFileSink. >>>>> I >>>>>>>> think We >>>>>>>>>>> have began >>>>>>>>>>> enhancing the supported FileFormat (e.g., ORC, >> Avro...), >>>>> and >>>>>>>>>>> reusing StreamFileSink should be able to avoid repeat work >> in >>>> the >>>>>>> Table >>>>>>>>>>> library. Besides, >>>>>>>>>>> the bucket concept seems also matches the semantics >> of >>>>>>> partition. >>>>>>>>>>> >>>>>>>>>>> For the notification of adding partitions, I'm a >> little >>>>>>> wondering >>>>>>>>>>> that the Watermark mechanism might not be enough since >>>>>>> Bucket/Partition >>>>>>>>>>> might spans >>>>>>>>>>> multiple subtasks. It depends on the level of >>>> notification: >>>>>> if >>>>>>> we >>>>>>>>>>> want to notify for the bucket on each subtask, using >> watermark >>>> to >>>>>>>> notifying >>>>>>>>>>> each subtask >>>>>>>>>>> should be ok, but if we want to notifying for the >> whole >>>>>>>>>>> Bucket/Partition, we might need to also do some >> coordination >>>>>> between >>>>>>>>>>> subtasks. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Yun >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>> ------------------------------------------------------------------ >>>>>>>>>>> From:Piotr Nowojski <[hidden email]> >>>>>>>>>>> Send Time:2020 Mar. 13 (Fri.) 18:03 >>>>>>>>>>> To:dev <[hidden email]> >>>>>>>>>>> Cc:user <[hidden email]>; user-zh < >>>>> [hidden email] >>>>>>> >>>>>>>>>>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in >> Table >>>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Which actual sinks/sources are you planning to use in this >>>>> feature? >>>>>>> Is >>>>>>>> it about exposing StreamingFileSink in the Table API? Or do you >>>> want >>>>> to >>>>>>>> implement new Sinks/Sources? >>>>>>>>>>> >>>>>>>>>>> Piotrek >>>>>>>>>>> >>>>>>>>>>>> On 13 Mar 2020, at 10:04, jinhai wang < >> [hidden email]> >>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> Thanks for FLIP-115. It is really useful feature for >> platform >>>>>>>> developers who manage hundreds of Flink to Hive jobs in >> production. >>>>>>>>>>> >>>>>>>>>>>> I think we need add 'connector.sink.username' for >>>>>>>> UserGroupInformation when data is written to HDFS >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> >> 写入: >>>>>>>>>>>> >>>>>>>>>>>> Hi everyone, >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> I'd like to start a discussion about FLIP-115 Filesystem >>>>>> connector >>>>>>>> in Table >>>>>>>>>>>> [1]. >>>>>>>>>>>> This FLIP will bring: >>>>>>>>>>>> - Introduce Filesystem table factory in table, support >>>>>>>>>>>> csv/parquet/orc/json/avro formats. >>>>>>>>>>>> - Introduce streaming filesystem/hive sink in table >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> CC to user mail list, if you have any unmet needs, >> please >>>> feel >>>>>>> free >>>>>>>> to >>>>>>>>>>>> reply~ >>>>>>>>>>>> >>>>>>>>>>>> Look forward to hearing from you. >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >> https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Jingsong Lee >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Best, Jingsong Lee >>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best, Jingsong Lee >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best, Jingsong Lee >>>>>>> >>>>>> >>>>> >>>> >>>> >>>> -- >>>> Best, Jingsong Lee >>>> >> |
Hi Stephan & Kostas & Piotrek, thanks for these inputs,
Maybe what I expressed is not clear. For the implementation, I want to know what you think, rather than must making another set from scratch. Piotrek you are right, implementation is the part of this FLIP too, because we can not list all detail things in the FLIP, so the implementation do affect user's behaviors. And the maintenance / development costs are also points. I think you already persuaded me. I am thinking about based on StreamingFileSink. And extending StreamingFileSink can solve "partition commit" requirement, I have tried in my POC. And it is true, Recoverable things and S3 things also important. So I listed "What is missing" for StreamingFileSink in previous mail. (It is not defense for making another set from scratch) Hi Stephan, >> The FLIP is "Filesystem connector in Table", it's about building up Flink Table's capabilities. What I mean is this is not just for Hive, this FLIP is for table. So we don't need do all things for Hive. But Hive is also a "format" (or something else) of Filesystem connector. Its requirements can be considered. I think you are right about the design, and let me take this seriously, a unify way make us stronger, less confuse, less surprise, more rigorous design. And I am pretty sure table things are good for enhancing DataStream api too. Hi Kostas, Yes, Parquet and Orc are the main formats. Good to know your support~ I think streaming file sink and file system connector things are important to Flink, it is good&time to think about these common requirements, think about batch support, it is not just about table, it is for whole Flink too. If there are some requirements that is hard to support or violates existing design. Exclude them. Best, Jingsong Lee On Wed, Mar 18, 2020 at 10:31 PM Piotr Nowojski <[hidden email]> wrote: > Hi Kurt, > > +1 for having some offline discussion on this topic. > > But I think the question about using StreamingFileSink or implementing > subset of it’s feature from scratch is quite fundamental design decision, > with impact on the behaviour of Public API, so I wouldn’t discard it as > technical detail and should be included as part of the FLIP (I know It > could be argued in opposite direction). > > Piotrek > > > On 18 Mar 2020, at 13:55, Kurt Young <[hidden email]> wrote: > > > > Hi all, > > > > Thanks for the discuss and feedbacks. I think this FLIP doesn't imply the > > implementation > > of such connector yet, it only describes the functionality and expected > > behaviors from user's > > perspective. Reusing current StreamingFileSink is definitely one of the > > possible ways to > > implement it. Since there are lots of details and I would suggest we can > > have an offline meeting > > to discuss the how these could be achieved by extending StremingFileSink, > > and how much > > effort we need to put on it. What do you think? > > > > Best, > > Kurt > > > > > > On Wed, Mar 18, 2020 at 7:21 PM Kostas Kloudas <[hidden email]> > wrote: > > > >> Hi all, > >> > >> I also agree with Stephan on this! > >> > >> It has been more than a year now that most of our efforts have had the > >> "unify" / "unification"/ etc either on their title or in their core > >> and this has been the focus of all our resources. By deviating from > >> this now, we only put more stress on other teams in the future. When > >> the users start using a given API, with high probability, they will > >> ask (and it is totally reasonable) consistent behaviour from all the > >> other APIs that ship with Flink. This will eventually lead to having > >> to answer the questions that we now deem as difficult in a future > >> release, when we will have to "unify" again. > >> > >> In addition, Hive integration is definitely a "nice to have" feature > >> but it does not mean that we need to push for 100% compatibility if it > >> is not required. > >> @Jingsong Li if you think that Parquet and Orc are the main formats, > >> we can focus on these and provide good support for them (both reading > >> and writing). For maintainability, I think that given the amount of > >> demand for these formats, it is not going to be a huge problem at > >> least for now. > >> > >> Given the above, I am also leaning towards a solution that aims at > >> extending the StreamingFileSink to efficiently support bulk formats > >> like Parquet and Orc, rather than creating a new sink that locks > >> Hive-dependent usecases to a specific API. > >> > >> Cheers, > >> Kostas > >> > >> > >> > >> > >> > >> On Wed, Mar 18, 2020 at 12:03 PM Stephan Ewen <[hidden email]> wrote: > >>> > >>>>> The FLIP is "Filesystem connector in Table", it's about building up > >>> Flink Table's capabilities. > >>> > >>> That is exactly what worries me. The whole effort is not thinking about > >>> Flink as a whole any more. > >>> This proposal is not trying to build a consistent user experience > across > >>> batch and streaming, across Table API, SQL, and DataStream. > >>> > >>> The proposal is building a separate, disconnected ecosystem for the > Table > >>> API, specific to batch processing and some limited streaming setups. > >>> Specific to one type of environment (Hive and HDFS). It willingly omits > >>> support for other environments and conflicts with efforts in other > >>> components to unify. > >>> > >>> Supporting common use cases is good, but in my opinion not at the price > >> of > >>> creating a "fractured" project where the approaches in different layers > >>> don't fit together any more. > >>> > >>> > >>>> *## StreamingFileSink not support writer with path* > >>>> > >>>> The FLIP is "Filesystem connector in Table", it's about building up > >> Flink > >>>> Table's capabilities. But I think Hive is important, I see that most > >> users > >>>> use Flink and Spark to write data from Kafka to hive. Streaming > >> writing, I > >>>> see that these two engines are convenient and popular. I mean, Flink > >> is not > >>>> only a hive runtime, but also an important part of offline data > >> warehouse. > >>>> The thing is StreamingFileSink not support hadoop record writers. Yes, > >> we > >>>> can support them one by one. I see the community integrating ORC [1]. > >> But > >>>> it's really not an easy thing. And we have to be careful to maintain > >>>> compatibility. After all, users downstream use other computing engines > >> to > >>>> analyze. > >>>> Yes, exposing "RecoverableFsDataOutputStream" to writers is good to > >>>> subsequent optimization [2]. But there are many cases. It is enough > for > >>>> users to generate new files at the checkpoint. They pay more attention > >> to > >>>> whether they can do it and whether there is a risk of compatibility. > >>>> Therefore, RecordWriter is used here. > >>>> > >>>> *## External HDFS access* > >>>> > >>>> Including hadoop configuration and Kerberos related things. > >>>> > >>>> *## Partition commit* > >>>> > >>>> Committing a partition is to notify the downstream application that > the > >>>> partition has finished writing, the partition is ready to be read.The > >>>> common way is to add a success file or update metastore. Of course, > >> there > >>>> are other ways to notify. We need to provide flexible mechanisms. > >>>> As you mentioned, yes, we can extend "StreamingFileSink" for this > part. > >>>> > >>>> *## Batch / Streaming Unification* > >>>> > >>>> Yes, it is about exactly-once and single commit at the end, There are > >> also > >>>> some "bounded" differences. For example, batch can support sorting. In > >> this > >>>> way, you can sort by partition, which can reduce the number of writers > >>>> written at the same time. Dynamic partition writing in batch may > >> produce > >>>> many unordered partitions. > >>>> > >>>> [1] https://issues.apache.org/jira/browse/FLINK-10114 > >>>> [2] https://issues.apache.org/jira/browse/FLINK-11499 > >>>> > >>>> Best, > >>>> Jingsong Lee > >>>> > >>>> On Tue, Mar 17, 2020 at 8:00 PM LakeShen <[hidden email]> > >>>> wrote: > >>>> > >>>>> Hi Jingsong , > >>>>> > >>>>> I am looking forward this feature. Because in some streaming > >>>> application,it > >>>>> need transfer their messages to hdfs , in order to offline analysis. > >>>>> > >>>>> Best wishes, > >>>>> LakeShen > >>>>> > >>>>> Stephan Ewen <[hidden email]> 于2020年3月17日周二 下午7:42写道: > >>>>> > >>>>>> I would really like to see us converging the stack and the > >>>> functionality > >>>>>> here. > >>>>>> Meaning to try and use the same sinks in the Table API as for the > >>>>>> DataStream API, and using the same sink for batch and streaming. > >>>>>> > >>>>>> The StreamingFileSink has a lot of things that can help with that. > >> If > >>>>>> possible, it would be nice to extend it (which would help move > >> towards > >>>>> the > >>>>>> above goal) rather than build a second sink. Building a second sink > >>>> leads > >>>>>> us further away from unification. > >>>>>> > >>>>>> I am a bit puzzled by the statement that sinks are primarily for > >> Hive. > >>>>> The > >>>>>> Table API should not be coupled to Hive, it should be an > >> independent > >>>>>> batch/streaming API for many use cases, supporting very well for > >> batch > >>>>> and > >>>>>> streaming interplay. Supporting Hive is great, but we should not be > >>>>>> building this towards Hive, as just yet another Hive runtime. Why > >> "yet > >>>>>> another Hive runtime" when what we have a unique streaming engine > >> that > >>>>> can > >>>>>> do much more? We would drop our own strength and reduce ourselves > >> to a > >>>>>> limited subset. > >>>>>> > >>>>>> Let's build a File Sink that can also support Hive, but can do so > >> much > >>>>>> more. For example, efficient streaming file ingestion as > >> materialized > >>>>> views > >>>>>> from changelogs. > >>>>>> > >>>>>> > >>>>>> *## Writing Files in Streaming* > >>>>>> > >>>>>> To write files in streaming, I don't see another way than using the > >>>>>> streaming file sink. If you want to write files across checkpoints, > >>>>> support > >>>>>> exactly-once, and support consistent "stop with savepoint", it is > >> not > >>>>>> trivial. > >>>>>> > >>>>>> A part of the complexity comes from the fact that not all targets > >> are > >>>>>> actually file systems, and not all have simple semantics for > >>>> persistence. > >>>>>> S3 for example does not support renames (only copies, which may > >> take a > >>>>> lot > >>>>>> of time) and it does not support flush/sync of data (the S3 file > >> system > >>>>> in > >>>>>> Hadoop exposes that but it does not work. flush/sync, followed by a > >>>>>> failure, leads to data loss). You need to devise a separate > >> protocol > >>>> for > >>>>>> that, which is exactly what has already been done and abstracted > >> behind > >>>>> the > >>>>>> recoverable writers. > >>>>>> > >>>>>> If you re-engineer that in the, you will end up either missing many > >>>>> things > >>>>>> (intermediate persistence on different file systems, and atomic > >> commit > >>>> in > >>>>>> the absence of renames, etc.), or you end up doing something > >> similar as > >>>>> the > >>>>>> recoverable writers do. > >>>>>> > >>>>>> > >>>>>> *## Atomic Commit in Batch* > >>>>>> > >>>>>> For batch sinks, it is also desirable to write the data first and > >> then > >>>>>> atomically commit it once the job is done. > >>>>>> Hadoop has spent a lot of time making this work, see this doc here, > >>>>>> specifically the section on 'The "Magic" Committer'. [1] > >>>>>> > >>>>>> What Flink has built in the RecoverableWriter is in some way an > >> even > >>>>> better > >>>>>> version of this, because it works without extra files (we pass data > >>>>> through > >>>>>> checkpoint state) and it supports not only committing once at the > >> end, > >>>>> but > >>>>>> committing multiple time intermediate parts during checkpoints. > >>>>>> > >>>>>> Meaning using the recoverable writer mechanism in batch would > >> allow us > >>>> to > >>>>>> immediately get the efficient atomic commit implementations on > >> file:// > >>>>>> hdfs:// and s3://, with a well defined way to implement it also for > >>>> other > >>>>>> file systems. > >>>>>> > >>>>>> > >>>>>> *## Batch / Streaming Unification* > >>>>>> > >>>>>> It would be great to start looking at these things in the same way: > >>>>>> - streaming (exactly-once): commits files (after finished) at the > >>>> next > >>>>>> checkpoint > >>>>>> - batch: single commit at the end of the job > >>>>>> > >>>>>> > >>>>>> *## DataStream / Table API Stack Unification* > >>>>>> > >>>>>> Having the same set of capabilities would make it much easier for > >> users > >>>>> to > >>>>>> understand the system. > >>>>>> Especially when it comes to consistent behavior across external > >>>> systems. > >>>>>> Having a different file sink in Table API and DataStream API means > >> that > >>>>>> DataStream can write correctly to S3 while Table API cannot. > >>>>>> > >>>>>> > >>>>>> *## What is missing?* > >>>>>> > >>>>>> It seems there are some things that get in the way of naturally > >>>>>> Can you make a list of what features are missing in the > >>>> StreamingFileSink > >>>>>> that make it usable for the use cases you have in mind? > >>>>>> > >>>>>> Best, > >>>>>> Stephan > >>>>>> > >>>>>> [1] > >>>>>> > >>>>>> > >>>>> > >>>> > >> > https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html > >>>>>> > >>>>>> > >>>>>> On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li < > >> [hidden email]> > >>>>>> wrote: > >>>>>> > >>>>>>> Hi Piotr, > >>>>>>> > >>>>>>> I am very entangled. > >>>>>>> > >>>>>>> Let me re-list the table streaming sink requirements: > >>>>>>> - In table, maybe 90% sinks are for Hive. The parquet and orc > >> are the > >>>>>> most > >>>>>>> important formats. Hive provide RecordWriters, it is easy to > >> support > >>>>> all > >>>>>>> hive formats by using it, and we don't need concern hive version > >>>>>>> compatibility too, but it can not work with FSDataOutputStream. > >>>>>>> - Hive table maybe use external HDFS. It means, hive has its own > >>>> hadoop > >>>>>>> configuration. > >>>>>>> - In table, partition commit is needed, we can not just move > >> files, > >>>> it > >>>>> is > >>>>>>> important to complete table semantics to update catalog. > >>>>>>> > >>>>>>> You are right DataStream and Table streaming sink will not be > >> fully > >>>>>>> compatible, each with its own set of limitations, quirks and > >>>> features. > >>>>>>> But if re-using DataStream, batch and streaming also will not be > >>>> fully > >>>>>>> compatible. Provide a unify experience to batch and streaming is > >> also > >>>>>>> important. > >>>>>>> > >>>>>>> Table and DataStream have different concerns, and they tilt in > >>>>> different > >>>>>>> directions. > >>>>>>> > >>>>>>> Of course, it is very good to see a unify implementation to solve > >>>> batch > >>>>>>> sink and hive things, unify DataStream batch sink and DataStream > >>>>>> streaming > >>>>>>> sink and Table batch sink and Table streaming sink. > >>>>>>> > >>>>>>> Le's see what others think. > >>>>>>> > >>>>>>> Best, > >>>>>>> Jingsong Lee > >>>>>>> > >>>>>>> > >>>>>>> On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski < > >> [hidden email]> > >>>>>>> wrote: > >>>>>>> > >>>>>>>> Hi Jingsong, > >>>>>>>> > >>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has > >> handled > >>>> the > >>>>>>>> partition and metastore logic well. > >>>>>>>>> - unify batch and streaming > >>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat. > >>>>>>>>> - Add exactly-once related logic. Just 200+ lines code. > >>>>>>>>> - It's natural to support more table features, like partition > >>>>> commit, > >>>>>>>> auto compact and etc.. > >>>>>>>>> > >>>>>>>>> Second way is reusing Datastream StreamingFileSink: > >>>>>>>>> - unify streaming sink between table and Datastream. > >>>>>>>>> - It maybe hard to introduce table related features to > >>>>>>> StreamingFileSink. > >>>>>>>>> > >>>>>>>>> I prefer the first way a little. What do you think? > >>>>>>>> > >>>>>>>> I would be surprised if adding “exactly-once related logic” is > >> just > >>>>> 200 > >>>>>>>> lines of code. There are things like multi part file upload to > >> s3 > >>>> and > >>>>>>> there > >>>>>>>> are also some pending features like [1]. I would suggest to > >>>>> ask/involve > >>>>>>>> Klou in this discussion. > >>>>>>>> > >>>>>>>> If it’s as easy to support exactly-once streaming with current > >>>> batch > >>>>>>> sink, > >>>>>>>> that begs the question, why do we need to maintain > >>>> StreamingFileSink? > >>>>>>>> > >>>>>>>> The worst possible outcome from my perspective will be, if we > >> have > >>>>>>> another > >>>>>>>> example of an operator/logic implemented independently both in > >>>>>> DataStream > >>>>>>>> API and Table API. Because I’m pretty sure they will not be > >> fully > >>>>>>>> compatible, each with it’s own set of limitations, quirks and > >>>>> features. > >>>>>>>> Especially that we have on our long term roadmap and wish list > >> to > >>>>> unify > >>>>>>>> such kind of operators. > >>>>>>>> > >>>>>>>> Piotrek > >>>>>>>> > >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11499 < > >>>>>>>> https://issues.apache.org/jira/browse/FLINK-11499> > >>>>>>>> > >>>>>>>>> On 16 Mar 2020, at 06:55, Jingsong Li < > >> [hidden email]> > >>>>>> wrote: > >>>>>>>>> > >>>>>>>>> Thanks Jinhai for involving. > >>>>>>>>> > >>>>>>>>>> we need add 'connector.sink.username' for > >> UserGroupInformation > >>>>> when > >>>>>>> data > >>>>>>>>> is written to HDFS > >>>>>>>>> > >>>>>>>>> Yes, I am not an expert of HDFS, but it seems we need do this > >>>>> "doAs" > >>>>>> in > >>>>>>>> the > >>>>>>>>> code for access external HDFS. I will update document. > >>>>>>>>> > >>>>>>>>> Best, > >>>>>>>>> Jingsong Lee > >>>>>>>>> > >>>>>>>>> On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li < > >>>>> [hidden email] > >>>>>>> > >>>>>>>> wrote: > >>>>>>>>> > >>>>>>>>>> Thanks Piotr and Yun for involving. > >>>>>>>>>> > >>>>>>>>>> Hi Piotr and Yun, for implementation, > >>>>>>>>>> > >>>>>>>>>> FLINK-14254 [1] introduce batch sink table world, it deals > >> with > >>>>>>>> partitions > >>>>>>>>>> thing, metastore thing and etc.. And it just reuse > >>>>>> Dataset/Datastream > >>>>>>>>>> FileInputFormat and FileOutputFormat. Filesystem can not do > >>>>> without > >>>>>>>>>> FileInputFormat, because it need deal with file things, > >> split > >>>>>> things. > >>>>>>>> Like > >>>>>>>>>> orc and parquet, they need read whole file and have > >> different > >>>>> split > >>>>>>>> logic. > >>>>>>>>>> > >>>>>>>>>> So back to file system connector: > >>>>>>>>>> - It needs introducing FilesystemTableFactory, > >>>>> FilesystemTableSource > >>>>>>> and > >>>>>>>>>> FilesystemTableSink. > >>>>>>>>>> - For sources, reusing Dataset/Datastream FileInputFormats, > >>>> there > >>>>>> are > >>>>>>> no > >>>>>>>>>> other interface to finish file reading. > >>>>>>>>>> > >>>>>>>>>> For file sinks: > >>>>>>>>>> - Batch sink use FLINK-14254 > >>>>>>>>>> - Streaming sink has two ways. > >>>>>>>>>> > >>>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has > >> handled > >>>> the > >>>>>>>>>> partition and metastore logic well. > >>>>>>>>>> - unify batch and streaming > >>>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat. > >>>>>>>>>> - Add exactly-once related logic. Just 200+ lines code. > >>>>>>>>>> - It's natural to support more table features, like > >> partition > >>>>>> commit, > >>>>>>>> auto > >>>>>>>>>> compact and etc.. > >>>>>>>>>> > >>>>>>>>>> Second way is reusing Datastream StreamingFileSink: > >>>>>>>>>> - unify streaming sink between table and Datastream. > >>>>>>>>>> - It maybe hard to introduce table related features to > >>>>>>>> StreamingFileSink. > >>>>>>>>>> > >>>>>>>>>> I prefer the first way a little. What do you think? > >>>>>>>>>> > >>>>>>>>>> Hi Yun, > >>>>>>>>>> > >>>>>>>>>>> Watermark mechanism might not be enough. > >>>>>>>>>> > >>>>>>>>>> Watermarks of subtasks are the same in the "snapshotState". > >>>>>>>>>> > >>>>>>>>>>> we might need to also do some coordination between > >> subtasks. > >>>>>>>>>> > >>>>>>>>>> Yes, JobMaster is the role to control subtasks. Metastore > >> is a > >>>>> very > >>>>>>>>>> fragile single point, which can not be accessed by > >> distributed, > >>>> so > >>>>>> it > >>>>>>> is > >>>>>>>>>> uniformly accessed by JobMaster. > >>>>>>>>>> > >>>>>>>>>> [1]https://issues.apache.org/jira/browse/FLINK-14254 > >>>>>>>>>> > >>>>>>>>>> Best, > >>>>>>>>>> Jingsong Lee > >>>>>>>>>> > >>>>>>>>>> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao < > >> [hidden email]> > >>>>>> wrote: > >>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> Very thanks for Jinsong to bring up this discussion! > >> It > >>>>>> should > >>>>>>>>>>> largely improve the usability after enhancing the > >> FileSystem > >>>>>>> connector > >>>>>>>> in > >>>>>>>>>>> Table. > >>>>>>>>>>> > >>>>>>>>>>> I have the same question with Piotr. From my side, I > >>>> think > >>>>> it > >>>>>>>>>>> should be better to be able to reuse existing > >>>> StreamingFileSink. > >>>>> I > >>>>>>>> think We > >>>>>>>>>>> have began > >>>>>>>>>>> enhancing the supported FileFormat (e.g., ORC, > >> Avro...), > >>>>> and > >>>>>>>>>>> reusing StreamFileSink should be able to avoid repeat work > >> in > >>>> the > >>>>>>> Table > >>>>>>>>>>> library. Besides, > >>>>>>>>>>> the bucket concept seems also matches the semantics > >> of > >>>>>>> partition. > >>>>>>>>>>> > >>>>>>>>>>> For the notification of adding partitions, I'm a > >> little > >>>>>>> wondering > >>>>>>>>>>> that the Watermark mechanism might not be enough since > >>>>>>> Bucket/Partition > >>>>>>>>>>> might spans > >>>>>>>>>>> multiple subtasks. It depends on the level of > >>>> notification: > >>>>>> if > >>>>>>> we > >>>>>>>>>>> want to notify for the bucket on each subtask, using > >> watermark > >>>> to > >>>>>>>> notifying > >>>>>>>>>>> each subtask > >>>>>>>>>>> should be ok, but if we want to notifying for the > >> whole > >>>>>>>>>>> Bucket/Partition, we might need to also do some > >> coordination > >>>>>> between > >>>>>>>>>>> subtasks. > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Best, > >>>>>>>>>>> Yun > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>> ------------------------------------------------------------------ > >>>>>>>>>>> From:Piotr Nowojski <[hidden email]> > >>>>>>>>>>> Send Time:2020 Mar. 13 (Fri.) 18:03 > >>>>>>>>>>> To:dev <[hidden email]> > >>>>>>>>>>> Cc:user <[hidden email]>; user-zh < > >>>>> [hidden email] > >>>>>>> > >>>>>>>>>>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in > >> Table > >>>>>>>>>>> > >>>>>>>>>>> Hi, > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> Which actual sinks/sources are you planning to use in this > >>>>> feature? > >>>>>>> Is > >>>>>>>> it about exposing StreamingFileSink in the Table API? Or do you > >>>> want > >>>>> to > >>>>>>>> implement new Sinks/Sources? > >>>>>>>>>>> > >>>>>>>>>>> Piotrek > >>>>>>>>>>> > >>>>>>>>>>>> On 13 Mar 2020, at 10:04, jinhai wang < > >> [hidden email]> > >>>>>> wrote: > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>> Thanks for FLIP-115. It is really useful feature for > >> platform > >>>>>>>> developers who manage hundreds of Flink to Hive jobs in > >> production. > >>>>>>>>>>> > >>>>>>>>>>>> I think we need add 'connector.sink.username' for > >>>>>>>> UserGroupInformation when data is written to HDFS > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> > >> 写入: > >>>>>>>>>>>> > >>>>>>>>>>>> Hi everyone, > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>> I'd like to start a discussion about FLIP-115 Filesystem > >>>>>> connector > >>>>>>>> in Table > >>>>>>>>>>>> [1]. > >>>>>>>>>>>> This FLIP will bring: > >>>>>>>>>>>> - Introduce Filesystem table factory in table, support > >>>>>>>>>>>> csv/parquet/orc/json/avro formats. > >>>>>>>>>>>> - Introduce streaming filesystem/hive sink in table > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>>> CC to user mail list, if you have any unmet needs, > >> please > >>>> feel > >>>>>>> free > >>>>>>>> to > >>>>>>>>>>>> reply~ > >>>>>>>>>>>> > >>>>>>>>>>>> Look forward to hearing from you. > >>>>>>>>>>>> > >>>>>>>>>>>> [1] > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>> > >>>> > >> > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > >>>>>>>>>>>> > >>>>>>>>>>>> Best, > >>>>>>>>>>>> Jingsong Lee > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> -- > >>>>>>>>>> Best, Jingsong Lee > >>>>>>>>>> > >>>>>>>>> > >>>>>>>>> > >>>>>>>>> -- > >>>>>>>>> Best, Jingsong Lee > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>>> -- > >>>>>>> Best, Jingsong Lee > >>>>>>> > >>>>>> > >>>>> > >>>> > >>>> > >>>> -- > >>>> Best, Jingsong Lee > >>>> > >> > > -- Best, Jingsong Lee |
Hi,
I am very interested in the topic. I would like to join the offline discussion if possible. I think you guys already give many inputs and concerns. I would share some of my thoughts. Correct me if I misunderstand you. Flink is a unified engine. Since that Flink should provide the e2e exactly-once semantics for the user in both streaming and batch. E2E exactly-once is not a trivial thing. StreamingFileSink already does a lot of work on how to support e2e exactly-once semantics for the “file” output scenario. It provides a layered architecture 1. BulkWriter/Encode deals with the data format in a file. 2. BucketAssinger/RollingPolicy deals with the lifecycle of the file and directory structure. 3. RecoverableWriter deals with the exactly-once semantics. All these layers are orthogonal and could be combined with each other. This could reduce much of the work. (Currently, there are some limitations.) There are some already known issues such as how to support batch in the StreamingFileSink, How to support orc and so on. But there are already some discussions on how to resolve these issues. Jinsong also gives some new cases that the StreamingFileSink might not support currently. I am very glad to see that you all agree that improving the StreamingFileSink architecture for these new cases. Best, Guowei Jingsong Li <[hidden email]> 于2020年3月19日周四 上午12:19写道: > Hi Stephan & Kostas & Piotrek, thanks for these inputs, > > Maybe what I expressed is not clear. For the implementation, I want to know > what you think, rather than must making another set from scratch. Piotrek > you are right, implementation is the part of this FLIP too, because we can > not list all detail things in the FLIP, so the implementation do affect > user's behaviors. And the maintenance / development costs are also points. > > I think you already persuaded me. I am thinking about based on > StreamingFileSink. And extending StreamingFileSink can solve "partition > commit" requirement, I have tried in my POC. And it is true, Recoverable > things and S3 things also important. > So I listed "What is missing" for StreamingFileSink in previous mail. (It > is not defense for making another set from scratch) > > Hi Stephan, > > >> The FLIP is "Filesystem connector in Table", it's about building up > Flink > Table's capabilities. > > What I mean is this is not just for Hive, this FLIP is for table. So we > don't need do all things for Hive. But Hive is also a "format" (or > something else) of Filesystem connector. Its requirements can be > considered. > > I think you are right about the design, and let me take this seriously, a > unify way make us stronger, less confuse, less surprise, more rigorous > design. And I am pretty sure table things are good for enhancing DataStream > api too. > > Hi Kostas, > > Yes, Parquet and Orc are the main formats. Good to know your support~ > > I think streaming file sink and file system connector things are important > to Flink, it is good&time to think about these common requirements, think > about batch support, it is not just about table, it is for whole Flink too. > If there are some requirements that is hard to support or violates existing > design. Exclude them. > > Best, > Jingsong Lee > > > On Wed, Mar 18, 2020 at 10:31 PM Piotr Nowojski <[hidden email]> > wrote: > > > Hi Kurt, > > > > +1 for having some offline discussion on this topic. > > > > But I think the question about using StreamingFileSink or implementing > > subset of it’s feature from scratch is quite fundamental design decision, > > with impact on the behaviour of Public API, so I wouldn’t discard it as > > technical detail and should be included as part of the FLIP (I know It > > could be argued in opposite direction). > > > > Piotrek > > > > > On 18 Mar 2020, at 13:55, Kurt Young <[hidden email]> wrote: > > > > > > Hi all, > > > > > > Thanks for the discuss and feedbacks. I think this FLIP doesn't imply > the > > > implementation > > > of such connector yet, it only describes the functionality and expected > > > behaviors from user's > > > perspective. Reusing current StreamingFileSink is definitely one of the > > > possible ways to > > > implement it. Since there are lots of details and I would suggest we > can > > > have an offline meeting > > > to discuss the how these could be achieved by extending > StremingFileSink, > > > and how much > > > effort we need to put on it. What do you think? > > > > > > Best, > > > Kurt > > > > > > > > > On Wed, Mar 18, 2020 at 7:21 PM Kostas Kloudas <[hidden email]> > > wrote: > > > > > >> Hi all, > > >> > > >> I also agree with Stephan on this! > > >> > > >> It has been more than a year now that most of our efforts have had the > > >> "unify" / "unification"/ etc either on their title or in their core > > >> and this has been the focus of all our resources. By deviating from > > >> this now, we only put more stress on other teams in the future. When > > >> the users start using a given API, with high probability, they will > > >> ask (and it is totally reasonable) consistent behaviour from all the > > >> other APIs that ship with Flink. This will eventually lead to having > > >> to answer the questions that we now deem as difficult in a future > > >> release, when we will have to "unify" again. > > >> > > >> In addition, Hive integration is definitely a "nice to have" feature > > >> but it does not mean that we need to push for 100% compatibility if it > > >> is not required. > > >> @Jingsong Li if you think that Parquet and Orc are the main formats, > > >> we can focus on these and provide good support for them (both reading > > >> and writing). For maintainability, I think that given the amount of > > >> demand for these formats, it is not going to be a huge problem at > > >> least for now. > > >> > > >> Given the above, I am also leaning towards a solution that aims at > > >> extending the StreamingFileSink to efficiently support bulk formats > > >> like Parquet and Orc, rather than creating a new sink that locks > > >> Hive-dependent usecases to a specific API. > > >> > > >> Cheers, > > >> Kostas > > >> > > >> > > >> > > >> > > >> > > >> On Wed, Mar 18, 2020 at 12:03 PM Stephan Ewen <[hidden email]> > wrote: > > >>> > > >>>>> The FLIP is "Filesystem connector in Table", it's about building up > > >>> Flink Table's capabilities. > > >>> > > >>> That is exactly what worries me. The whole effort is not thinking > about > > >>> Flink as a whole any more. > > >>> This proposal is not trying to build a consistent user experience > > across > > >>> batch and streaming, across Table API, SQL, and DataStream. > > >>> > > >>> The proposal is building a separate, disconnected ecosystem for the > > Table > > >>> API, specific to batch processing and some limited streaming setups. > > >>> Specific to one type of environment (Hive and HDFS). It willingly > omits > > >>> support for other environments and conflicts with efforts in other > > >>> components to unify. > > >>> > > >>> Supporting common use cases is good, but in my opinion not at the > price > > >> of > > >>> creating a "fractured" project where the approaches in different > layers > > >>> don't fit together any more. > > >>> > > >>> > > >>>> *## StreamingFileSink not support writer with path* > > >>>> > > >>>> The FLIP is "Filesystem connector in Table", it's about building up > > >> Flink > > >>>> Table's capabilities. But I think Hive is important, I see that most > > >> users > > >>>> use Flink and Spark to write data from Kafka to hive. Streaming > > >> writing, I > > >>>> see that these two engines are convenient and popular. I mean, Flink > > >> is not > > >>>> only a hive runtime, but also an important part of offline data > > >> warehouse. > > >>>> The thing is StreamingFileSink not support hadoop record writers. > Yes, > > >> we > > >>>> can support them one by one. I see the community integrating ORC > [1]. > > >> But > > >>>> it's really not an easy thing. And we have to be careful to maintain > > >>>> compatibility. After all, users downstream use other computing > engines > > >> to > > >>>> analyze. > > >>>> Yes, exposing "RecoverableFsDataOutputStream" to writers is good to > > >>>> subsequent optimization [2]. But there are many cases. It is enough > > for > > >>>> users to generate new files at the checkpoint. They pay more > attention > > >> to > > >>>> whether they can do it and whether there is a risk of compatibility. > > >>>> Therefore, RecordWriter is used here. > > >>>> > > >>>> *## External HDFS access* > > >>>> > > >>>> Including hadoop configuration and Kerberos related things. > > >>>> > > >>>> *## Partition commit* > > >>>> > > >>>> Committing a partition is to notify the downstream application that > > the > > >>>> partition has finished writing, the partition is ready to be > read.The > > >>>> common way is to add a success file or update metastore. Of course, > > >> there > > >>>> are other ways to notify. We need to provide flexible mechanisms. > > >>>> As you mentioned, yes, we can extend "StreamingFileSink" for this > > part. > > >>>> > > >>>> *## Batch / Streaming Unification* > > >>>> > > >>>> Yes, it is about exactly-once and single commit at the end, There > are > > >> also > > >>>> some "bounded" differences. For example, batch can support sorting. > In > > >> this > > >>>> way, you can sort by partition, which can reduce the number of > writers > > >>>> written at the same time. Dynamic partition writing in batch may > > >> produce > > >>>> many unordered partitions. > > >>>> > > >>>> [1] https://issues.apache.org/jira/browse/FLINK-10114 > > >>>> [2] https://issues.apache.org/jira/browse/FLINK-11499 > > >>>> > > >>>> Best, > > >>>> Jingsong Lee > > >>>> > > >>>> On Tue, Mar 17, 2020 at 8:00 PM LakeShen <[hidden email] > > > > >>>> wrote: > > >>>> > > >>>>> Hi Jingsong , > > >>>>> > > >>>>> I am looking forward this feature. Because in some streaming > > >>>> application,it > > >>>>> need transfer their messages to hdfs , in order to offline > analysis. > > >>>>> > > >>>>> Best wishes, > > >>>>> LakeShen > > >>>>> > > >>>>> Stephan Ewen <[hidden email]> 于2020年3月17日周二 下午7:42写道: > > >>>>> > > >>>>>> I would really like to see us converging the stack and the > > >>>> functionality > > >>>>>> here. > > >>>>>> Meaning to try and use the same sinks in the Table API as for the > > >>>>>> DataStream API, and using the same sink for batch and streaming. > > >>>>>> > > >>>>>> The StreamingFileSink has a lot of things that can help with that. > > >> If > > >>>>>> possible, it would be nice to extend it (which would help move > > >> towards > > >>>>> the > > >>>>>> above goal) rather than build a second sink. Building a second > sink > > >>>> leads > > >>>>>> us further away from unification. > > >>>>>> > > >>>>>> I am a bit puzzled by the statement that sinks are primarily for > > >> Hive. > > >>>>> The > > >>>>>> Table API should not be coupled to Hive, it should be an > > >> independent > > >>>>>> batch/streaming API for many use cases, supporting very well for > > >> batch > > >>>>> and > > >>>>>> streaming interplay. Supporting Hive is great, but we should not > be > > >>>>>> building this towards Hive, as just yet another Hive runtime. Why > > >> "yet > > >>>>>> another Hive runtime" when what we have a unique streaming engine > > >> that > > >>>>> can > > >>>>>> do much more? We would drop our own strength and reduce ourselves > > >> to a > > >>>>>> limited subset. > > >>>>>> > > >>>>>> Let's build a File Sink that can also support Hive, but can do so > > >> much > > >>>>>> more. For example, efficient streaming file ingestion as > > >> materialized > > >>>>> views > > >>>>>> from changelogs. > > >>>>>> > > >>>>>> > > >>>>>> *## Writing Files in Streaming* > > >>>>>> > > >>>>>> To write files in streaming, I don't see another way than using > the > > >>>>>> streaming file sink. If you want to write files across > checkpoints, > > >>>>> support > > >>>>>> exactly-once, and support consistent "stop with savepoint", it is > > >> not > > >>>>>> trivial. > > >>>>>> > > >>>>>> A part of the complexity comes from the fact that not all targets > > >> are > > >>>>>> actually file systems, and not all have simple semantics for > > >>>> persistence. > > >>>>>> S3 for example does not support renames (only copies, which may > > >> take a > > >>>>> lot > > >>>>>> of time) and it does not support flush/sync of data (the S3 file > > >> system > > >>>>> in > > >>>>>> Hadoop exposes that but it does not work. flush/sync, followed by > a > > >>>>>> failure, leads to data loss). You need to devise a separate > > >> protocol > > >>>> for > > >>>>>> that, which is exactly what has already been done and abstracted > > >> behind > > >>>>> the > > >>>>>> recoverable writers. > > >>>>>> > > >>>>>> If you re-engineer that in the, you will end up either missing > many > > >>>>> things > > >>>>>> (intermediate persistence on different file systems, and atomic > > >> commit > > >>>> in > > >>>>>> the absence of renames, etc.), or you end up doing something > > >> similar as > > >>>>> the > > >>>>>> recoverable writers do. > > >>>>>> > > >>>>>> > > >>>>>> *## Atomic Commit in Batch* > > >>>>>> > > >>>>>> For batch sinks, it is also desirable to write the data first and > > >> then > > >>>>>> atomically commit it once the job is done. > > >>>>>> Hadoop has spent a lot of time making this work, see this doc > here, > > >>>>>> specifically the section on 'The "Magic" Committer'. [1] > > >>>>>> > > >>>>>> What Flink has built in the RecoverableWriter is in some way an > > >> even > > >>>>> better > > >>>>>> version of this, because it works without extra files (we pass > data > > >>>>> through > > >>>>>> checkpoint state) and it supports not only committing once at the > > >> end, > > >>>>> but > > >>>>>> committing multiple time intermediate parts during checkpoints. > > >>>>>> > > >>>>>> Meaning using the recoverable writer mechanism in batch would > > >> allow us > > >>>> to > > >>>>>> immediately get the efficient atomic commit implementations on > > >> file:// > > >>>>>> hdfs:// and s3://, with a well defined way to implement it also > for > > >>>> other > > >>>>>> file systems. > > >>>>>> > > >>>>>> > > >>>>>> *## Batch / Streaming Unification* > > >>>>>> > > >>>>>> It would be great to start looking at these things in the same > way: > > >>>>>> - streaming (exactly-once): commits files (after finished) at the > > >>>> next > > >>>>>> checkpoint > > >>>>>> - batch: single commit at the end of the job > > >>>>>> > > >>>>>> > > >>>>>> *## DataStream / Table API Stack Unification* > > >>>>>> > > >>>>>> Having the same set of capabilities would make it much easier for > > >> users > > >>>>> to > > >>>>>> understand the system. > > >>>>>> Especially when it comes to consistent behavior across external > > >>>> systems. > > >>>>>> Having a different file sink in Table API and DataStream API means > > >> that > > >>>>>> DataStream can write correctly to S3 while Table API cannot. > > >>>>>> > > >>>>>> > > >>>>>> *## What is missing?* > > >>>>>> > > >>>>>> It seems there are some things that get in the way of naturally > > >>>>>> Can you make a list of what features are missing in the > > >>>> StreamingFileSink > > >>>>>> that make it usable for the use cases you have in mind? > > >>>>>> > > >>>>>> Best, > > >>>>>> Stephan > > >>>>>> > > >>>>>> [1] > > >>>>>> > > >>>>>> > > >>>>> > > >>>> > > >> > > > https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html > > >>>>>> > > >>>>>> > > >>>>>> On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li < > > >> [hidden email]> > > >>>>>> wrote: > > >>>>>> > > >>>>>>> Hi Piotr, > > >>>>>>> > > >>>>>>> I am very entangled. > > >>>>>>> > > >>>>>>> Let me re-list the table streaming sink requirements: > > >>>>>>> - In table, maybe 90% sinks are for Hive. The parquet and orc > > >> are the > > >>>>>> most > > >>>>>>> important formats. Hive provide RecordWriters, it is easy to > > >> support > > >>>>> all > > >>>>>>> hive formats by using it, and we don't need concern hive version > > >>>>>>> compatibility too, but it can not work with FSDataOutputStream. > > >>>>>>> - Hive table maybe use external HDFS. It means, hive has its own > > >>>> hadoop > > >>>>>>> configuration. > > >>>>>>> - In table, partition commit is needed, we can not just move > > >> files, > > >>>> it > > >>>>> is > > >>>>>>> important to complete table semantics to update catalog. > > >>>>>>> > > >>>>>>> You are right DataStream and Table streaming sink will not be > > >> fully > > >>>>>>> compatible, each with its own set of limitations, quirks and > > >>>> features. > > >>>>>>> But if re-using DataStream, batch and streaming also will not be > > >>>> fully > > >>>>>>> compatible. Provide a unify experience to batch and streaming is > > >> also > > >>>>>>> important. > > >>>>>>> > > >>>>>>> Table and DataStream have different concerns, and they tilt in > > >>>>> different > > >>>>>>> directions. > > >>>>>>> > > >>>>>>> Of course, it is very good to see a unify implementation to solve > > >>>> batch > > >>>>>>> sink and hive things, unify DataStream batch sink and DataStream > > >>>>>> streaming > > >>>>>>> sink and Table batch sink and Table streaming sink. > > >>>>>>> > > >>>>>>> Le's see what others think. > > >>>>>>> > > >>>>>>> Best, > > >>>>>>> Jingsong Lee > > >>>>>>> > > >>>>>>> > > >>>>>>> On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski < > > >> [hidden email]> > > >>>>>>> wrote: > > >>>>>>> > > >>>>>>>> Hi Jingsong, > > >>>>>>>> > > >>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has > > >> handled > > >>>> the > > >>>>>>>> partition and metastore logic well. > > >>>>>>>>> - unify batch and streaming > > >>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat. > > >>>>>>>>> - Add exactly-once related logic. Just 200+ lines code. > > >>>>>>>>> - It's natural to support more table features, like partition > > >>>>> commit, > > >>>>>>>> auto compact and etc.. > > >>>>>>>>> > > >>>>>>>>> Second way is reusing Datastream StreamingFileSink: > > >>>>>>>>> - unify streaming sink between table and Datastream. > > >>>>>>>>> - It maybe hard to introduce table related features to > > >>>>>>> StreamingFileSink. > > >>>>>>>>> > > >>>>>>>>> I prefer the first way a little. What do you think? > > >>>>>>>> > > >>>>>>>> I would be surprised if adding “exactly-once related logic” is > > >> just > > >>>>> 200 > > >>>>>>>> lines of code. There are things like multi part file upload to > > >> s3 > > >>>> and > > >>>>>>> there > > >>>>>>>> are also some pending features like [1]. I would suggest to > > >>>>> ask/involve > > >>>>>>>> Klou in this discussion. > > >>>>>>>> > > >>>>>>>> If it’s as easy to support exactly-once streaming with current > > >>>> batch > > >>>>>>> sink, > > >>>>>>>> that begs the question, why do we need to maintain > > >>>> StreamingFileSink? > > >>>>>>>> > > >>>>>>>> The worst possible outcome from my perspective will be, if we > > >> have > > >>>>>>> another > > >>>>>>>> example of an operator/logic implemented independently both in > > >>>>>> DataStream > > >>>>>>>> API and Table API. Because I’m pretty sure they will not be > > >> fully > > >>>>>>>> compatible, each with it’s own set of limitations, quirks and > > >>>>> features. > > >>>>>>>> Especially that we have on our long term roadmap and wish list > > >> to > > >>>>> unify > > >>>>>>>> such kind of operators. > > >>>>>>>> > > >>>>>>>> Piotrek > > >>>>>>>> > > >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11499 < > > >>>>>>>> https://issues.apache.org/jira/browse/FLINK-11499> > > >>>>>>>> > > >>>>>>>>> On 16 Mar 2020, at 06:55, Jingsong Li < > > >> [hidden email]> > > >>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>> Thanks Jinhai for involving. > > >>>>>>>>> > > >>>>>>>>>> we need add 'connector.sink.username' for > > >> UserGroupInformation > > >>>>> when > > >>>>>>> data > > >>>>>>>>> is written to HDFS > > >>>>>>>>> > > >>>>>>>>> Yes, I am not an expert of HDFS, but it seems we need do this > > >>>>> "doAs" > > >>>>>> in > > >>>>>>>> the > > >>>>>>>>> code for access external HDFS. I will update document. > > >>>>>>>>> > > >>>>>>>>> Best, > > >>>>>>>>> Jingsong Lee > > >>>>>>>>> > > >>>>>>>>> On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li < > > >>>>> [hidden email] > > >>>>>>> > > >>>>>>>> wrote: > > >>>>>>>>> > > >>>>>>>>>> Thanks Piotr and Yun for involving. > > >>>>>>>>>> > > >>>>>>>>>> Hi Piotr and Yun, for implementation, > > >>>>>>>>>> > > >>>>>>>>>> FLINK-14254 [1] introduce batch sink table world, it deals > > >> with > > >>>>>>>> partitions > > >>>>>>>>>> thing, metastore thing and etc.. And it just reuse > > >>>>>> Dataset/Datastream > > >>>>>>>>>> FileInputFormat and FileOutputFormat. Filesystem can not do > > >>>>> without > > >>>>>>>>>> FileInputFormat, because it need deal with file things, > > >> split > > >>>>>> things. > > >>>>>>>> Like > > >>>>>>>>>> orc and parquet, they need read whole file and have > > >> different > > >>>>> split > > >>>>>>>> logic. > > >>>>>>>>>> > > >>>>>>>>>> So back to file system connector: > > >>>>>>>>>> - It needs introducing FilesystemTableFactory, > > >>>>> FilesystemTableSource > > >>>>>>> and > > >>>>>>>>>> FilesystemTableSink. > > >>>>>>>>>> - For sources, reusing Dataset/Datastream FileInputFormats, > > >>>> there > > >>>>>> are > > >>>>>>> no > > >>>>>>>>>> other interface to finish file reading. > > >>>>>>>>>> > > >>>>>>>>>> For file sinks: > > >>>>>>>>>> - Batch sink use FLINK-14254 > > >>>>>>>>>> - Streaming sink has two ways. > > >>>>>>>>>> > > >>>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has > > >> handled > > >>>> the > > >>>>>>>>>> partition and metastore logic well. > > >>>>>>>>>> - unify batch and streaming > > >>>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat. > > >>>>>>>>>> - Add exactly-once related logic. Just 200+ lines code. > > >>>>>>>>>> - It's natural to support more table features, like > > >> partition > > >>>>>> commit, > > >>>>>>>> auto > > >>>>>>>>>> compact and etc.. > > >>>>>>>>>> > > >>>>>>>>>> Second way is reusing Datastream StreamingFileSink: > > >>>>>>>>>> - unify streaming sink between table and Datastream. > > >>>>>>>>>> - It maybe hard to introduce table related features to > > >>>>>>>> StreamingFileSink. > > >>>>>>>>>> > > >>>>>>>>>> I prefer the first way a little. What do you think? > > >>>>>>>>>> > > >>>>>>>>>> Hi Yun, > > >>>>>>>>>> > > >>>>>>>>>>> Watermark mechanism might not be enough. > > >>>>>>>>>> > > >>>>>>>>>> Watermarks of subtasks are the same in the "snapshotState". > > >>>>>>>>>> > > >>>>>>>>>>> we might need to also do some coordination between > > >> subtasks. > > >>>>>>>>>> > > >>>>>>>>>> Yes, JobMaster is the role to control subtasks. Metastore > > >> is a > > >>>>> very > > >>>>>>>>>> fragile single point, which can not be accessed by > > >> distributed, > > >>>> so > > >>>>>> it > > >>>>>>> is > > >>>>>>>>>> uniformly accessed by JobMaster. > > >>>>>>>>>> > > >>>>>>>>>> [1]https://issues.apache.org/jira/browse/FLINK-14254 > > >>>>>>>>>> > > >>>>>>>>>> Best, > > >>>>>>>>>> Jingsong Lee > > >>>>>>>>>> > > >>>>>>>>>> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao < > > >> [hidden email]> > > >>>>>> wrote: > > >>>>>>>>>> > > >>>>>>>>>>> Hi, > > >>>>>>>>>>> > > >>>>>>>>>>> Very thanks for Jinsong to bring up this discussion! > > >> It > > >>>>>> should > > >>>>>>>>>>> largely improve the usability after enhancing the > > >> FileSystem > > >>>>>>> connector > > >>>>>>>> in > > >>>>>>>>>>> Table. > > >>>>>>>>>>> > > >>>>>>>>>>> I have the same question with Piotr. From my side, I > > >>>> think > > >>>>> it > > >>>>>>>>>>> should be better to be able to reuse existing > > >>>> StreamingFileSink. > > >>>>> I > > >>>>>>>> think We > > >>>>>>>>>>> have began > > >>>>>>>>>>> enhancing the supported FileFormat (e.g., ORC, > > >> Avro...), > > >>>>> and > > >>>>>>>>>>> reusing StreamFileSink should be able to avoid repeat work > > >> in > > >>>> the > > >>>>>>> Table > > >>>>>>>>>>> library. Besides, > > >>>>>>>>>>> the bucket concept seems also matches the semantics > > >> of > > >>>>>>> partition. > > >>>>>>>>>>> > > >>>>>>>>>>> For the notification of adding partitions, I'm a > > >> little > > >>>>>>> wondering > > >>>>>>>>>>> that the Watermark mechanism might not be enough since > > >>>>>>> Bucket/Partition > > >>>>>>>>>>> might spans > > >>>>>>>>>>> multiple subtasks. It depends on the level of > > >>>> notification: > > >>>>>> if > > >>>>>>> we > > >>>>>>>>>>> want to notify for the bucket on each subtask, using > > >> watermark > > >>>> to > > >>>>>>>> notifying > > >>>>>>>>>>> each subtask > > >>>>>>>>>>> should be ok, but if we want to notifying for the > > >> whole > > >>>>>>>>>>> Bucket/Partition, we might need to also do some > > >> coordination > > >>>>>> between > > >>>>>>>>>>> subtasks. > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Best, > > >>>>>>>>>>> Yun > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>> ------------------------------------------------------------------ > > >>>>>>>>>>> From:Piotr Nowojski <[hidden email]> > > >>>>>>>>>>> Send Time:2020 Mar. 13 (Fri.) 18:03 > > >>>>>>>>>>> To:dev <[hidden email]> > > >>>>>>>>>>> Cc:user <[hidden email]>; user-zh < > > >>>>> [hidden email] > > >>>>>>> > > >>>>>>>>>>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in > > >> Table > > >>>>>>>>>>> > > >>>>>>>>>>> Hi, > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> Which actual sinks/sources are you planning to use in this > > >>>>> feature? > > >>>>>>> Is > > >>>>>>>> it about exposing StreamingFileSink in the Table API? Or do you > > >>>> want > > >>>>> to > > >>>>>>>> implement new Sinks/Sources? > > >>>>>>>>>>> > > >>>>>>>>>>> Piotrek > > >>>>>>>>>>> > > >>>>>>>>>>>> On 13 Mar 2020, at 10:04, jinhai wang < > > >> [hidden email]> > > >>>>>> wrote: > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>>> Thanks for FLIP-115. It is really useful feature for > > >> platform > > >>>>>>>> developers who manage hundreds of Flink to Hive jobs in > > >> production. > > >>>>>>>>>>> > > >>>>>>>>>>>> I think we need add 'connector.sink.username' for > > >>>>>>>> UserGroupInformation when data is written to HDFS > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> > > >> 写入: > > >>>>>>>>>>>> > > >>>>>>>>>>>> Hi everyone, > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>>> I'd like to start a discussion about FLIP-115 Filesystem > > >>>>>> connector > > >>>>>>>> in Table > > >>>>>>>>>>>> [1]. > > >>>>>>>>>>>> This FLIP will bring: > > >>>>>>>>>>>> - Introduce Filesystem table factory in table, support > > >>>>>>>>>>>> csv/parquet/orc/json/avro formats. > > >>>>>>>>>>>> - Introduce streaming filesystem/hive sink in table > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>>> CC to user mail list, if you have any unmet needs, > > >> please > > >>>> feel > > >>>>>>> free > > >>>>>>>> to > > >>>>>>>>>>>> reply~ > > >>>>>>>>>>>> > > >>>>>>>>>>>> Look forward to hearing from you. > > >>>>>>>>>>>> > > >>>>>>>>>>>> [1] > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >> > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > >>>>>>>>>>>> > > >>>>>>>>>>>> Best, > > >>>>>>>>>>>> Jingsong Lee > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>>> > > >>>>>>>>>> > > >>>>>>>>>> -- > > >>>>>>>>>> Best, Jingsong Lee > > >>>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> > > >>>>>>>>> -- > > >>>>>>>>> Best, Jingsong Lee > > >>>>>>>> > > >>>>>>>> > > >>>>>>> > > >>>>>>> -- > > >>>>>>> Best, Jingsong Lee > > >>>>>>> > > >>>>>> > > >>>>> > > >>>> > > >>>> > > >>>> -- > > >>>> Best, Jingsong Lee > > >>>> > > >> > > > > > > -- > Best, Jingsong Lee > |
Hi all,
After some great discussion, I think we have at least reached a consensus that we can have a unified sink to handle streaming, batch, hive and HDFS. And for FileSystem connector, undoubtedly, we reuse DataStream StreamingFileSink. I updated the FLIP: 1.Move external HDFS and close exactly-once to future plan. 2. Provide Implementation details: - Introduce FileSystemTableFactory, FileSystemTableSource, FileSystemTableSink to provide table related implementations, this should not be blocked by FLIP-95[1], but will migrate to new interfaces after FLIP-95 finished. - Read: Reusing FileInputFormat, this should not be blocked by FLIP-27[2], but will migrate to new interfaces after FLIP-27 finished. - Write: Reusing current batch sink and DataStream StreamingFileSink - Formats should do: - Data format: Format should use BaseRow, after FLIP-95, we use BaseRow to be source/sink data format. - Read: Format should provide FileInputFormat with partition fields support. - Write: Format should provide BulkWriter.Factory or Encoder for unify sink implementation (Now for StreamingFileSink). - We plan to implement CSV, JSON, PARQUET, ORC first. This FLIP describes the basic user interface, and we have reached a consensus on implementation. Consider there is a lot of work to be done, I'd like start a voting thread for this. What do you think? [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces [2] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface Best, Jingsong Lee On Thu, Mar 19, 2020 at 3:48 PM Guowei Ma <[hidden email]> wrote: > Hi, > > > I am very interested in the topic. I would like to join the offline > discussion if possible. I think you guys already give many inputs and > concerns. I would share some of my thoughts. Correct me if I misunderstand > you. > > Flink is a unified engine. Since that Flink should provide the e2e > exactly-once semantics for the user in both streaming and batch. E2E > exactly-once is not a trivial thing. > > StreamingFileSink already does a lot of work on how to support e2e > exactly-once semantics for the “file” output scenario. It provides a > layered architecture > > 1. > > BulkWriter/Encode deals with the data format in a file. > 2. > > BucketAssinger/RollingPolicy deals with the lifecycle of the file and > directory structure. > 3. > > RecoverableWriter deals with the exactly-once semantics. > > > All these layers are orthogonal and could be combined with each other. This > could reduce much of the work. (Currently, there are some limitations.) > There are some already known issues such as how to support batch in the > StreamingFileSink, How to support orc and so on. But there are already some > discussions on how to resolve these issues. > > Jinsong also gives some new cases that the StreamingFileSink might not > support currently. I am very glad to see that you all agree that improving > the StreamingFileSink architecture for these new cases. > > Best, > Guowei > > > Jingsong Li <[hidden email]> 于2020年3月19日周四 上午12:19写道: > > > Hi Stephan & Kostas & Piotrek, thanks for these inputs, > > > > Maybe what I expressed is not clear. For the implementation, I want to > know > > what you think, rather than must making another set from scratch. Piotrek > > you are right, implementation is the part of this FLIP too, because we > can > > not list all detail things in the FLIP, so the implementation do affect > > user's behaviors. And the maintenance / development costs are also > points. > > > > I think you already persuaded me. I am thinking about based on > > StreamingFileSink. And extending StreamingFileSink can solve "partition > > commit" requirement, I have tried in my POC. And it is true, Recoverable > > things and S3 things also important. > > So I listed "What is missing" for StreamingFileSink in previous mail. (It > > is not defense for making another set from scratch) > > > > Hi Stephan, > > > > >> The FLIP is "Filesystem connector in Table", it's about building up > > Flink > > Table's capabilities. > > > > What I mean is this is not just for Hive, this FLIP is for table. So we > > don't need do all things for Hive. But Hive is also a "format" (or > > something else) of Filesystem connector. Its requirements can be > > considered. > > > > I think you are right about the design, and let me take this seriously, a > > unify way make us stronger, less confuse, less surprise, more rigorous > > design. And I am pretty sure table things are good for enhancing > DataStream > > api too. > > > > Hi Kostas, > > > > Yes, Parquet and Orc are the main formats. Good to know your support~ > > > > I think streaming file sink and file system connector things are > important > > to Flink, it is good&time to think about these common requirements, think > > about batch support, it is not just about table, it is for whole Flink > too. > > If there are some requirements that is hard to support or violates > existing > > design. Exclude them. > > > > Best, > > Jingsong Lee > > > > > > On Wed, Mar 18, 2020 at 10:31 PM Piotr Nowojski <[hidden email]> > > wrote: > > > > > Hi Kurt, > > > > > > +1 for having some offline discussion on this topic. > > > > > > But I think the question about using StreamingFileSink or implementing > > > subset of it’s feature from scratch is quite fundamental design > decision, > > > with impact on the behaviour of Public API, so I wouldn’t discard it as > > > technical detail and should be included as part of the FLIP (I know It > > > could be argued in opposite direction). > > > > > > Piotrek > > > > > > > On 18 Mar 2020, at 13:55, Kurt Young <[hidden email]> wrote: > > > > > > > > Hi all, > > > > > > > > Thanks for the discuss and feedbacks. I think this FLIP doesn't imply > > the > > > > implementation > > > > of such connector yet, it only describes the functionality and > expected > > > > behaviors from user's > > > > perspective. Reusing current StreamingFileSink is definitely one of > the > > > > possible ways to > > > > implement it. Since there are lots of details and I would suggest we > > can > > > > have an offline meeting > > > > to discuss the how these could be achieved by extending > > StremingFileSink, > > > > and how much > > > > effort we need to put on it. What do you think? > > > > > > > > Best, > > > > Kurt > > > > > > > > > > > > On Wed, Mar 18, 2020 at 7:21 PM Kostas Kloudas <[hidden email]> > > > wrote: > > > > > > > >> Hi all, > > > >> > > > >> I also agree with Stephan on this! > > > >> > > > >> It has been more than a year now that most of our efforts have had > the > > > >> "unify" / "unification"/ etc either on their title or in their core > > > >> and this has been the focus of all our resources. By deviating from > > > >> this now, we only put more stress on other teams in the future. When > > > >> the users start using a given API, with high probability, they will > > > >> ask (and it is totally reasonable) consistent behaviour from all the > > > >> other APIs that ship with Flink. This will eventually lead to having > > > >> to answer the questions that we now deem as difficult in a future > > > >> release, when we will have to "unify" again. > > > >> > > > >> In addition, Hive integration is definitely a "nice to have" feature > > > >> but it does not mean that we need to push for 100% compatibility if > it > > > >> is not required. > > > >> @Jingsong Li if you think that Parquet and Orc are the main formats, > > > >> we can focus on these and provide good support for them (both > reading > > > >> and writing). For maintainability, I think that given the amount of > > > >> demand for these formats, it is not going to be a huge problem at > > > >> least for now. > > > >> > > > >> Given the above, I am also leaning towards a solution that aims at > > > >> extending the StreamingFileSink to efficiently support bulk formats > > > >> like Parquet and Orc, rather than creating a new sink that locks > > > >> Hive-dependent usecases to a specific API. > > > >> > > > >> Cheers, > > > >> Kostas > > > >> > > > >> > > > >> > > > >> > > > >> > > > >> On Wed, Mar 18, 2020 at 12:03 PM Stephan Ewen <[hidden email]> > > wrote: > > > >>> > > > >>>>> The FLIP is "Filesystem connector in Table", it's about building > up > > > >>> Flink Table's capabilities. > > > >>> > > > >>> That is exactly what worries me. The whole effort is not thinking > > about > > > >>> Flink as a whole any more. > > > >>> This proposal is not trying to build a consistent user experience > > > across > > > >>> batch and streaming, across Table API, SQL, and DataStream. > > > >>> > > > >>> The proposal is building a separate, disconnected ecosystem for the > > > Table > > > >>> API, specific to batch processing and some limited streaming > setups. > > > >>> Specific to one type of environment (Hive and HDFS). It willingly > > omits > > > >>> support for other environments and conflicts with efforts in other > > > >>> components to unify. > > > >>> > > > >>> Supporting common use cases is good, but in my opinion not at the > > price > > > >> of > > > >>> creating a "fractured" project where the approaches in different > > layers > > > >>> don't fit together any more. > > > >>> > > > >>> > > > >>>> *## StreamingFileSink not support writer with path* > > > >>>> > > > >>>> The FLIP is "Filesystem connector in Table", it's about building > up > > > >> Flink > > > >>>> Table's capabilities. But I think Hive is important, I see that > most > > > >> users > > > >>>> use Flink and Spark to write data from Kafka to hive. Streaming > > > >> writing, I > > > >>>> see that these two engines are convenient and popular. I mean, > Flink > > > >> is not > > > >>>> only a hive runtime, but also an important part of offline data > > > >> warehouse. > > > >>>> The thing is StreamingFileSink not support hadoop record writers. > > Yes, > > > >> we > > > >>>> can support them one by one. I see the community integrating ORC > > [1]. > > > >> But > > > >>>> it's really not an easy thing. And we have to be careful to > maintain > > > >>>> compatibility. After all, users downstream use other computing > > engines > > > >> to > > > >>>> analyze. > > > >>>> Yes, exposing "RecoverableFsDataOutputStream" to writers is good > to > > > >>>> subsequent optimization [2]. But there are many cases. It is > enough > > > for > > > >>>> users to generate new files at the checkpoint. They pay more > > attention > > > >> to > > > >>>> whether they can do it and whether there is a risk of > compatibility. > > > >>>> Therefore, RecordWriter is used here. > > > >>>> > > > >>>> *## External HDFS access* > > > >>>> > > > >>>> Including hadoop configuration and Kerberos related things. > > > >>>> > > > >>>> *## Partition commit* > > > >>>> > > > >>>> Committing a partition is to notify the downstream application > that > > > the > > > >>>> partition has finished writing, the partition is ready to be > > read.The > > > >>>> common way is to add a success file or update metastore. Of > course, > > > >> there > > > >>>> are other ways to notify. We need to provide flexible mechanisms. > > > >>>> As you mentioned, yes, we can extend "StreamingFileSink" for this > > > part. > > > >>>> > > > >>>> *## Batch / Streaming Unification* > > > >>>> > > > >>>> Yes, it is about exactly-once and single commit at the end, There > > are > > > >> also > > > >>>> some "bounded" differences. For example, batch can support > sorting. > > In > > > >> this > > > >>>> way, you can sort by partition, which can reduce the number of > > writers > > > >>>> written at the same time. Dynamic partition writing in batch may > > > >> produce > > > >>>> many unordered partitions. > > > >>>> > > > >>>> [1] https://issues.apache.org/jira/browse/FLINK-10114 > > > >>>> [2] https://issues.apache.org/jira/browse/FLINK-11499 > > > >>>> > > > >>>> Best, > > > >>>> Jingsong Lee > > > >>>> > > > >>>> On Tue, Mar 17, 2020 at 8:00 PM LakeShen < > [hidden email] > > > > > > >>>> wrote: > > > >>>> > > > >>>>> Hi Jingsong , > > > >>>>> > > > >>>>> I am looking forward this feature. Because in some streaming > > > >>>> application,it > > > >>>>> need transfer their messages to hdfs , in order to offline > > analysis. > > > >>>>> > > > >>>>> Best wishes, > > > >>>>> LakeShen > > > >>>>> > > > >>>>> Stephan Ewen <[hidden email]> 于2020年3月17日周二 下午7:42写道: > > > >>>>> > > > >>>>>> I would really like to see us converging the stack and the > > > >>>> functionality > > > >>>>>> here. > > > >>>>>> Meaning to try and use the same sinks in the Table API as for > the > > > >>>>>> DataStream API, and using the same sink for batch and streaming. > > > >>>>>> > > > >>>>>> The StreamingFileSink has a lot of things that can help with > that. > > > >> If > > > >>>>>> possible, it would be nice to extend it (which would help move > > > >> towards > > > >>>>> the > > > >>>>>> above goal) rather than build a second sink. Building a second > > sink > > > >>>> leads > > > >>>>>> us further away from unification. > > > >>>>>> > > > >>>>>> I am a bit puzzled by the statement that sinks are primarily for > > > >> Hive. > > > >>>>> The > > > >>>>>> Table API should not be coupled to Hive, it should be an > > > >> independent > > > >>>>>> batch/streaming API for many use cases, supporting very well for > > > >> batch > > > >>>>> and > > > >>>>>> streaming interplay. Supporting Hive is great, but we should not > > be > > > >>>>>> building this towards Hive, as just yet another Hive runtime. > Why > > > >> "yet > > > >>>>>> another Hive runtime" when what we have a unique streaming > engine > > > >> that > > > >>>>> can > > > >>>>>> do much more? We would drop our own strength and reduce > ourselves > > > >> to a > > > >>>>>> limited subset. > > > >>>>>> > > > >>>>>> Let's build a File Sink that can also support Hive, but can do > so > > > >> much > > > >>>>>> more. For example, efficient streaming file ingestion as > > > >> materialized > > > >>>>> views > > > >>>>>> from changelogs. > > > >>>>>> > > > >>>>>> > > > >>>>>> *## Writing Files in Streaming* > > > >>>>>> > > > >>>>>> To write files in streaming, I don't see another way than using > > the > > > >>>>>> streaming file sink. If you want to write files across > > checkpoints, > > > >>>>> support > > > >>>>>> exactly-once, and support consistent "stop with savepoint", it > is > > > >> not > > > >>>>>> trivial. > > > >>>>>> > > > >>>>>> A part of the complexity comes from the fact that not all > targets > > > >> are > > > >>>>>> actually file systems, and not all have simple semantics for > > > >>>> persistence. > > > >>>>>> S3 for example does not support renames (only copies, which may > > > >> take a > > > >>>>> lot > > > >>>>>> of time) and it does not support flush/sync of data (the S3 file > > > >> system > > > >>>>> in > > > >>>>>> Hadoop exposes that but it does not work. flush/sync, followed > by > > a > > > >>>>>> failure, leads to data loss). You need to devise a separate > > > >> protocol > > > >>>> for > > > >>>>>> that, which is exactly what has already been done and abstracted > > > >> behind > > > >>>>> the > > > >>>>>> recoverable writers. > > > >>>>>> > > > >>>>>> If you re-engineer that in the, you will end up either missing > > many > > > >>>>> things > > > >>>>>> (intermediate persistence on different file systems, and atomic > > > >> commit > > > >>>> in > > > >>>>>> the absence of renames, etc.), or you end up doing something > > > >> similar as > > > >>>>> the > > > >>>>>> recoverable writers do. > > > >>>>>> > > > >>>>>> > > > >>>>>> *## Atomic Commit in Batch* > > > >>>>>> > > > >>>>>> For batch sinks, it is also desirable to write the data first > and > > > >> then > > > >>>>>> atomically commit it once the job is done. > > > >>>>>> Hadoop has spent a lot of time making this work, see this doc > > here, > > > >>>>>> specifically the section on 'The "Magic" Committer'. [1] > > > >>>>>> > > > >>>>>> What Flink has built in the RecoverableWriter is in some way an > > > >> even > > > >>>>> better > > > >>>>>> version of this, because it works without extra files (we pass > > data > > > >>>>> through > > > >>>>>> checkpoint state) and it supports not only committing once at > the > > > >> end, > > > >>>>> but > > > >>>>>> committing multiple time intermediate parts during checkpoints. > > > >>>>>> > > > >>>>>> Meaning using the recoverable writer mechanism in batch would > > > >> allow us > > > >>>> to > > > >>>>>> immediately get the efficient atomic commit implementations on > > > >> file:// > > > >>>>>> hdfs:// and s3://, with a well defined way to implement it also > > for > > > >>>> other > > > >>>>>> file systems. > > > >>>>>> > > > >>>>>> > > > >>>>>> *## Batch / Streaming Unification* > > > >>>>>> > > > >>>>>> It would be great to start looking at these things in the same > > way: > > > >>>>>> - streaming (exactly-once): commits files (after finished) at > the > > > >>>> next > > > >>>>>> checkpoint > > > >>>>>> - batch: single commit at the end of the job > > > >>>>>> > > > >>>>>> > > > >>>>>> *## DataStream / Table API Stack Unification* > > > >>>>>> > > > >>>>>> Having the same set of capabilities would make it much easier > for > > > >> users > > > >>>>> to > > > >>>>>> understand the system. > > > >>>>>> Especially when it comes to consistent behavior across external > > > >>>> systems. > > > >>>>>> Having a different file sink in Table API and DataStream API > means > > > >> that > > > >>>>>> DataStream can write correctly to S3 while Table API cannot. > > > >>>>>> > > > >>>>>> > > > >>>>>> *## What is missing?* > > > >>>>>> > > > >>>>>> It seems there are some things that get in the way of naturally > > > >>>>>> Can you make a list of what features are missing in the > > > >>>> StreamingFileSink > > > >>>>>> that make it usable for the use cases you have in mind? > > > >>>>>> > > > >>>>>> Best, > > > >>>>>> Stephan > > > >>>>>> > > > >>>>>> [1] > > > >>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://hadoop.apache.org/docs/current/hadoop-aws/tools/hadoop-aws/committer_architecture.html > > > >>>>>> > > > >>>>>> > > > >>>>>> On Mon, Mar 16, 2020 at 12:31 PM Jingsong Li < > > > >> [hidden email]> > > > >>>>>> wrote: > > > >>>>>> > > > >>>>>>> Hi Piotr, > > > >>>>>>> > > > >>>>>>> I am very entangled. > > > >>>>>>> > > > >>>>>>> Let me re-list the table streaming sink requirements: > > > >>>>>>> - In table, maybe 90% sinks are for Hive. The parquet and orc > > > >> are the > > > >>>>>> most > > > >>>>>>> important formats. Hive provide RecordWriters, it is easy to > > > >> support > > > >>>>> all > > > >>>>>>> hive formats by using it, and we don't need concern hive > version > > > >>>>>>> compatibility too, but it can not work with FSDataOutputStream. > > > >>>>>>> - Hive table maybe use external HDFS. It means, hive has its > own > > > >>>> hadoop > > > >>>>>>> configuration. > > > >>>>>>> - In table, partition commit is needed, we can not just move > > > >> files, > > > >>>> it > > > >>>>> is > > > >>>>>>> important to complete table semantics to update catalog. > > > >>>>>>> > > > >>>>>>> You are right DataStream and Table streaming sink will not be > > > >> fully > > > >>>>>>> compatible, each with its own set of limitations, quirks and > > > >>>> features. > > > >>>>>>> But if re-using DataStream, batch and streaming also will not > be > > > >>>> fully > > > >>>>>>> compatible. Provide a unify experience to batch and streaming > is > > > >> also > > > >>>>>>> important. > > > >>>>>>> > > > >>>>>>> Table and DataStream have different concerns, and they tilt in > > > >>>>> different > > > >>>>>>> directions. > > > >>>>>>> > > > >>>>>>> Of course, it is very good to see a unify implementation to > solve > > > >>>> batch > > > >>>>>>> sink and hive things, unify DataStream batch sink and > DataStream > > > >>>>>> streaming > > > >>>>>>> sink and Table batch sink and Table streaming sink. > > > >>>>>>> > > > >>>>>>> Le's see what others think. > > > >>>>>>> > > > >>>>>>> Best, > > > >>>>>>> Jingsong Lee > > > >>>>>>> > > > >>>>>>> > > > >>>>>>> On Mon, Mar 16, 2020 at 4:15 PM Piotr Nowojski < > > > >> [hidden email]> > > > >>>>>>> wrote: > > > >>>>>>> > > > >>>>>>>> Hi Jingsong, > > > >>>>>>>> > > > >>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has > > > >> handled > > > >>>> the > > > >>>>>>>> partition and metastore logic well. > > > >>>>>>>>> - unify batch and streaming > > > >>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat. > > > >>>>>>>>> - Add exactly-once related logic. Just 200+ lines code. > > > >>>>>>>>> - It's natural to support more table features, like partition > > > >>>>> commit, > > > >>>>>>>> auto compact and etc.. > > > >>>>>>>>> > > > >>>>>>>>> Second way is reusing Datastream StreamingFileSink: > > > >>>>>>>>> - unify streaming sink between table and Datastream. > > > >>>>>>>>> - It maybe hard to introduce table related features to > > > >>>>>>> StreamingFileSink. > > > >>>>>>>>> > > > >>>>>>>>> I prefer the first way a little. What do you think? > > > >>>>>>>> > > > >>>>>>>> I would be surprised if adding “exactly-once related logic” is > > > >> just > > > >>>>> 200 > > > >>>>>>>> lines of code. There are things like multi part file upload to > > > >> s3 > > > >>>> and > > > >>>>>>> there > > > >>>>>>>> are also some pending features like [1]. I would suggest to > > > >>>>> ask/involve > > > >>>>>>>> Klou in this discussion. > > > >>>>>>>> > > > >>>>>>>> If it’s as easy to support exactly-once streaming with current > > > >>>> batch > > > >>>>>>> sink, > > > >>>>>>>> that begs the question, why do we need to maintain > > > >>>> StreamingFileSink? > > > >>>>>>>> > > > >>>>>>>> The worst possible outcome from my perspective will be, if we > > > >> have > > > >>>>>>> another > > > >>>>>>>> example of an operator/logic implemented independently both in > > > >>>>>> DataStream > > > >>>>>>>> API and Table API. Because I’m pretty sure they will not be > > > >> fully > > > >>>>>>>> compatible, each with it’s own set of limitations, quirks and > > > >>>>> features. > > > >>>>>>>> Especially that we have on our long term roadmap and wish list > > > >> to > > > >>>>> unify > > > >>>>>>>> such kind of operators. > > > >>>>>>>> > > > >>>>>>>> Piotrek > > > >>>>>>>> > > > >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-11499 < > > > >>>>>>>> https://issues.apache.org/jira/browse/FLINK-11499> > > > >>>>>>>> > > > >>>>>>>>> On 16 Mar 2020, at 06:55, Jingsong Li < > > > >> [hidden email]> > > > >>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>> Thanks Jinhai for involving. > > > >>>>>>>>> > > > >>>>>>>>>> we need add 'connector.sink.username' for > > > >> UserGroupInformation > > > >>>>> when > > > >>>>>>> data > > > >>>>>>>>> is written to HDFS > > > >>>>>>>>> > > > >>>>>>>>> Yes, I am not an expert of HDFS, but it seems we need do this > > > >>>>> "doAs" > > > >>>>>> in > > > >>>>>>>> the > > > >>>>>>>>> code for access external HDFS. I will update document. > > > >>>>>>>>> > > > >>>>>>>>> Best, > > > >>>>>>>>> Jingsong Lee > > > >>>>>>>>> > > > >>>>>>>>> On Mon, Mar 16, 2020 at 12:01 PM Jingsong Li < > > > >>>>> [hidden email] > > > >>>>>>> > > > >>>>>>>> wrote: > > > >>>>>>>>> > > > >>>>>>>>>> Thanks Piotr and Yun for involving. > > > >>>>>>>>>> > > > >>>>>>>>>> Hi Piotr and Yun, for implementation, > > > >>>>>>>>>> > > > >>>>>>>>>> FLINK-14254 [1] introduce batch sink table world, it deals > > > >> with > > > >>>>>>>> partitions > > > >>>>>>>>>> thing, metastore thing and etc.. And it just reuse > > > >>>>>> Dataset/Datastream > > > >>>>>>>>>> FileInputFormat and FileOutputFormat. Filesystem can not do > > > >>>>> without > > > >>>>>>>>>> FileInputFormat, because it need deal with file things, > > > >> split > > > >>>>>> things. > > > >>>>>>>> Like > > > >>>>>>>>>> orc and parquet, they need read whole file and have > > > >> different > > > >>>>> split > > > >>>>>>>> logic. > > > >>>>>>>>>> > > > >>>>>>>>>> So back to file system connector: > > > >>>>>>>>>> - It needs introducing FilesystemTableFactory, > > > >>>>> FilesystemTableSource > > > >>>>>>> and > > > >>>>>>>>>> FilesystemTableSink. > > > >>>>>>>>>> - For sources, reusing Dataset/Datastream FileInputFormats, > > > >>>> there > > > >>>>>> are > > > >>>>>>> no > > > >>>>>>>>>> other interface to finish file reading. > > > >>>>>>>>>> > > > >>>>>>>>>> For file sinks: > > > >>>>>>>>>> - Batch sink use FLINK-14254 > > > >>>>>>>>>> - Streaming sink has two ways. > > > >>>>>>>>>> > > > >>>>>>>>>> First way is reusing Batch sink in FLINK-14254, It has > > > >> handled > > > >>>> the > > > >>>>>>>>>> partition and metastore logic well. > > > >>>>>>>>>> - unify batch and streaming > > > >>>>>>>>>> - Using FileOutputFormat is consistent with FileInputFormat. > > > >>>>>>>>>> - Add exactly-once related logic. Just 200+ lines code. > > > >>>>>>>>>> - It's natural to support more table features, like > > > >> partition > > > >>>>>> commit, > > > >>>>>>>> auto > > > >>>>>>>>>> compact and etc.. > > > >>>>>>>>>> > > > >>>>>>>>>> Second way is reusing Datastream StreamingFileSink: > > > >>>>>>>>>> - unify streaming sink between table and Datastream. > > > >>>>>>>>>> - It maybe hard to introduce table related features to > > > >>>>>>>> StreamingFileSink. > > > >>>>>>>>>> > > > >>>>>>>>>> I prefer the first way a little. What do you think? > > > >>>>>>>>>> > > > >>>>>>>>>> Hi Yun, > > > >>>>>>>>>> > > > >>>>>>>>>>> Watermark mechanism might not be enough. > > > >>>>>>>>>> > > > >>>>>>>>>> Watermarks of subtasks are the same in the "snapshotState". > > > >>>>>>>>>> > > > >>>>>>>>>>> we might need to also do some coordination between > > > >> subtasks. > > > >>>>>>>>>> > > > >>>>>>>>>> Yes, JobMaster is the role to control subtasks. Metastore > > > >> is a > > > >>>>> very > > > >>>>>>>>>> fragile single point, which can not be accessed by > > > >> distributed, > > > >>>> so > > > >>>>>> it > > > >>>>>>> is > > > >>>>>>>>>> uniformly accessed by JobMaster. > > > >>>>>>>>>> > > > >>>>>>>>>> [1]https://issues.apache.org/jira/browse/FLINK-14254 > > > >>>>>>>>>> > > > >>>>>>>>>> Best, > > > >>>>>>>>>> Jingsong Lee > > > >>>>>>>>>> > > > >>>>>>>>>> On Fri, Mar 13, 2020 at 6:43 PM Yun Gao < > > > >> [hidden email]> > > > >>>>>> wrote: > > > >>>>>>>>>> > > > >>>>>>>>>>> Hi, > > > >>>>>>>>>>> > > > >>>>>>>>>>> Very thanks for Jinsong to bring up this discussion! > > > >> It > > > >>>>>> should > > > >>>>>>>>>>> largely improve the usability after enhancing the > > > >> FileSystem > > > >>>>>>> connector > > > >>>>>>>> in > > > >>>>>>>>>>> Table. > > > >>>>>>>>>>> > > > >>>>>>>>>>> I have the same question with Piotr. From my side, I > > > >>>> think > > > >>>>> it > > > >>>>>>>>>>> should be better to be able to reuse existing > > > >>>> StreamingFileSink. > > > >>>>> I > > > >>>>>>>> think We > > > >>>>>>>>>>> have began > > > >>>>>>>>>>> enhancing the supported FileFormat (e.g., ORC, > > > >> Avro...), > > > >>>>> and > > > >>>>>>>>>>> reusing StreamFileSink should be able to avoid repeat work > > > >> in > > > >>>> the > > > >>>>>>> Table > > > >>>>>>>>>>> library. Besides, > > > >>>>>>>>>>> the bucket concept seems also matches the semantics > > > >> of > > > >>>>>>> partition. > > > >>>>>>>>>>> > > > >>>>>>>>>>> For the notification of adding partitions, I'm a > > > >> little > > > >>>>>>> wondering > > > >>>>>>>>>>> that the Watermark mechanism might not be enough since > > > >>>>>>> Bucket/Partition > > > >>>>>>>>>>> might spans > > > >>>>>>>>>>> multiple subtasks. It depends on the level of > > > >>>> notification: > > > >>>>>> if > > > >>>>>>> we > > > >>>>>>>>>>> want to notify for the bucket on each subtask, using > > > >> watermark > > > >>>> to > > > >>>>>>>> notifying > > > >>>>>>>>>>> each subtask > > > >>>>>>>>>>> should be ok, but if we want to notifying for the > > > >> whole > > > >>>>>>>>>>> Bucket/Partition, we might need to also do some > > > >> coordination > > > >>>>>> between > > > >>>>>>>>>>> subtasks. > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> Best, > > > >>>>>>>>>>> Yun > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>> > ------------------------------------------------------------------ > > > >>>>>>>>>>> From:Piotr Nowojski <[hidden email]> > > > >>>>>>>>>>> Send Time:2020 Mar. 13 (Fri.) 18:03 > > > >>>>>>>>>>> To:dev <[hidden email]> > > > >>>>>>>>>>> Cc:user <[hidden email]>; user-zh < > > > >>>>> [hidden email] > > > >>>>>>> > > > >>>>>>>>>>> Subject:Re: [DISCUSS] FLIP-115: Filesystem connector in > > > >> Table > > > >>>>>>>>>>> > > > >>>>>>>>>>> Hi, > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> Which actual sinks/sources are you planning to use in this > > > >>>>> feature? > > > >>>>>>> Is > > > >>>>>>>> it about exposing StreamingFileSink in the Table API? Or do > you > > > >>>> want > > > >>>>> to > > > >>>>>>>> implement new Sinks/Sources? > > > >>>>>>>>>>> > > > >>>>>>>>>>> Piotrek > > > >>>>>>>>>>> > > > >>>>>>>>>>>> On 13 Mar 2020, at 10:04, jinhai wang < > > > >> [hidden email]> > > > >>>>>> wrote: > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>>> Thanks for FLIP-115. It is really useful feature for > > > >> platform > > > >>>>>>>> developers who manage hundreds of Flink to Hive jobs in > > > >> production. > > > >>>>>>>>>>> > > > >>>>>>>>>>>> I think we need add 'connector.sink.username' for > > > >>>>>>>> UserGroupInformation when data is written to HDFS > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> 在 2020/3/13 下午3:33,“Jingsong Li”<[hidden email]> > > > >> 写入: > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Hi everyone, > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>>> I'd like to start a discussion about FLIP-115 Filesystem > > > >>>>>> connector > > > >>>>>>>> in Table > > > >>>>>>>>>>>> [1]. > > > >>>>>>>>>>>> This FLIP will bring: > > > >>>>>>>>>>>> - Introduce Filesystem table factory in table, support > > > >>>>>>>>>>>> csv/parquet/orc/json/avro formats. > > > >>>>>>>>>>>> - Introduce streaming filesystem/hive sink in table > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>>> CC to user mail list, if you have any unmet needs, > > > >> please > > > >>>> feel > > > >>>>>>> free > > > >>>>>>>> to > > > >>>>>>>>>>>> reply~ > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Look forward to hearing from you. > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> [1] > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >> > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-115%3A+Filesystem+connector+in+Table > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> Best, > > > >>>>>>>>>>>> Jingsong Lee > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>>> > > > >>>>>>>>>> > > > >>>>>>>>>> -- > > > >>>>>>>>>> Best, Jingsong Lee > > > >>>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> > > > >>>>>>>>> -- > > > >>>>>>>>> Best, Jingsong Lee > > > >>>>>>>> > > > >>>>>>>> > > > >>>>>>> > > > >>>>>>> -- > > > >>>>>>> Best, Jingsong Lee > > > >>>>>>> > > > >>>>>> > > > >>>>> > > > >>>> > > > >>>> > > > >>>> -- > > > >>>> Best, Jingsong Lee > > > >>>> > > > >> > > > > > > > > > > -- > > Best, Jingsong Lee > > > -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |