Hi all,
I'd like to discuss about the semantic of returning null from InputFormat#nextRecord. For now, there is no explicit java doc about this. And there are three ways to handle this in Flink: 1. treat null as the end of input 2. skip null 3. assumes that the value from InputFormat#nextRecord cannot be null I quickly searched in Flink codebase about these usage: - org.apache.flink.api.common.operators.GenericDataSourceBase [2] - org.apache.flink.api.java.io.CsvInputFormat [2] - org.apache.flink.runtime.operators.DataSourceTask [2] - org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator [2] - org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction [1] - org.apache.flink.table.sources.CsvTableSource [1] - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3] - org.apache.flink.table.filesystem.FileSystemLookupFunction [3] I think we can align these behavior. about the alignment, I personally lean to #2 A step further, when will InputFormat#nextRecord returns null? One scenario is that we encountered dirty data, and want to skip it. Actually we face the same problem in org.apache.flink.api.common.serialization.DeserializationSchema in the past, and in 1.11 we added a method `void deserialize(byte[] message, Collector<T> out)`. It's default behavior is to ignore the null return value. Then could we also add a method `void nextRecord(OT reuse, Collector<OT> collector)` in InputFormat? Adding this method will enable us to return multi records in one call, which is very flexible for implementing an InputFormat. WDHY? -- Best, Benchao Li |
Hi Benchao,
My understanding is that #1 treats null as the end of input. This means we should try our best to avoid returning null before the end of input in the implementation of `InputFormat`. Even if we have to return null, we have to return it at the end of input. I think this is the most safe implementation. For #1 and #2, it works. For #3, I think we should avoid #3, actually, both `CRowValuesInputFormat` and `FileSystemLookupFunction` have corresponding implementations, the specific `InputFormat`s can ensure they have nice behavior about `reachedEnd` and `nextRecord`. Of course, we can also make their invocations more robust. Actually, IIUC, the InputFormat is a legacy interface, the new interface should be FLIP-27.[1] And we are planning implementing FileSource on the new interfaces too. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface Best, Jingsong On Thu, Jul 23, 2020 at 6:31 PM Benchao Li <[hidden email]> wrote: > Hi all, > > I'd like to discuss about the semantic of returning null from > InputFormat#nextRecord. > > For now, there is no explicit java doc about this. And there are three ways > to handle > this in Flink: > 1. treat null as the end of input > 2. skip null > 3. assumes that the value from InputFormat#nextRecord cannot be null > > I quickly searched in Flink codebase about these usage: > - org.apache.flink.api.common.operators.GenericDataSourceBase [2] > - org.apache.flink.api.java.io.CsvInputFormat [2] > - org.apache.flink.runtime.operators.DataSourceTask [2] > - > > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator > [2] > - org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction > [1] > - org.apache.flink.table.sources.CsvTableSource [1] > - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3] > - org.apache.flink.table.filesystem.FileSystemLookupFunction [3] > > I think we can align these behavior. about the alignment, I personally lean > to #2 > > A step further, when will InputFormat#nextRecord returns null? > One scenario is that we encountered dirty data, and want to skip it. > Actually we face the same problem in > org.apache.flink.api.common.serialization.DeserializationSchema > in the past, and in 1.11 we added a method `void deserialize(byte[] > message, Collector<T> out)`. > It's default behavior is to ignore the null return value. > > Then could we also add a method `void nextRecord(OT reuse, Collector<OT> > collector)` > in InputFormat? > Adding this method will enable us to return multi records in one call, > which is very flexible for implementing an InputFormat. > > WDHY? > > -- > > Best, > Benchao Li > -- Best, Jingsong Lee |
Thanks Jingsong for the response.
If we plan to deprecate InputFormat in the near future, I think it's ok to keep current behavior as it is for now. I raised this discussion because I found that InputFormatSourceFunction is not working as expected while I'm developing a batch table source internally. Our batch table source reads raw data from our internal storage, which has the same interface as Kafka, then we can leverage all the formats in streaming, e.g. json/pb. So it's common that we encounters dirty data and want to skip it like in DeserializationSchema. That's why I prefer #2. Anyway, If we are gonna to deprecate InputFormat in the near future, I think there is no need to change this in the community. Jingsong Li <[hidden email]> 于2020年7月30日周四 上午10:30写道: > Hi Benchao, > > My understanding is that #1 treats null as the end of input. > This means we should try our best to avoid returning null before the end of > input in the implementation of `InputFormat`. Even if we have to return > null, we have to return it at the end of input. > I think this is the most safe implementation. > For #1 and #2, it works. > For #3, I think we should avoid #3, actually, both `CRowValuesInputFormat` > and `FileSystemLookupFunction` have corresponding implementations, the > specific `InputFormat`s can ensure they have nice behavior about > `reachedEnd` and `nextRecord`. Of course, we can also make their > invocations more robust. > > Actually, IIUC, the InputFormat is a legacy interface, the new interface > should be FLIP-27.[1] And we are planning implementing FileSource on the > new interfaces too. > > [1] > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > Best, > Jingsong > > On Thu, Jul 23, 2020 at 6:31 PM Benchao Li <[hidden email]> wrote: > > > Hi all, > > > > I'd like to discuss about the semantic of returning null from > > InputFormat#nextRecord. > > > > For now, there is no explicit java doc about this. And there are three > ways > > to handle > > this in Flink: > > 1. treat null as the end of input > > 2. skip null > > 3. assumes that the value from InputFormat#nextRecord cannot be null > > > > I quickly searched in Flink codebase about these usage: > > - org.apache.flink.api.common.operators.GenericDataSourceBase [2] > > - org.apache.flink.api.java.io.CsvInputFormat [2] > > - org.apache.flink.runtime.operators.DataSourceTask [2] > > - > > > > > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator > > [2] > > - > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction > > [1] > > - org.apache.flink.table.sources.CsvTableSource [1] > > - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3] > > - org.apache.flink.table.filesystem.FileSystemLookupFunction [3] > > > > I think we can align these behavior. about the alignment, I personally > lean > > to #2 > > > > A step further, when will InputFormat#nextRecord returns null? > > One scenario is that we encountered dirty data, and want to skip it. > > Actually we face the same problem in > > org.apache.flink.api.common.serialization.DeserializationSchema > > in the past, and in 1.11 we added a method `void deserialize(byte[] > > message, Collector<T> out)`. > > It's default behavior is to ignore the null return value. > > > > Then could we also add a method `void nextRecord(OT reuse, Collector<OT> > > collector)` > > in InputFormat? > > Adding this method will enable us to return multi records in one call, > > which is very flexible for implementing an InputFormat. > > > > WDHY? > > > > -- > > > > Best, > > Benchao Li > > > > > -- > Best, Jingsong Lee > -- Best, Benchao Li |
Hi Benchao,
I'm very glad that you are developing the ecology of batch~ More detail for #1 is in `CsvInputFormat`: - There are dirty records (Comments) in `CsvInputFormat.readRecord`, so unfortunately, it must return null. - So go on like this, will return null in `nextRecord` like #2, so `CsvInputFormat` overwrite `nextRecord` like this: @Override public OUT nextRecord(OUT record) throws IOException { OUT returnRecord = null; do { returnRecord = super.nextRecord(record); } while (returnRecord == null && !reachedEnd()); return returnRecord; } Best, Jingsong On Thu, Jul 30, 2020 at 11:41 AM Benchao Li <[hidden email]> wrote: > Thanks Jingsong for the response. > > If we plan to deprecate InputFormat in the near future, I think it's ok to > keep current behavior > as it is for now. > > I raised this discussion because I found that InputFormatSourceFunction is > not working as expected > while I'm developing a batch table source internally. > Our batch table source reads raw data from our internal storage, which has > the same interface as Kafka, > then we can leverage all the formats in streaming, e.g. json/pb. So it's > common that we encounters dirty > data and want to skip it like in DeserializationSchema. > That's why I prefer #2. Anyway, If we are gonna to deprecate InputFormat in > the near future, I think there > is no need to change this in the community. > > > Jingsong Li <[hidden email]> 于2020年7月30日周四 上午10:30写道: > > > Hi Benchao, > > > > My understanding is that #1 treats null as the end of input. > > This means we should try our best to avoid returning null before the end > of > > input in the implementation of `InputFormat`. Even if we have to return > > null, we have to return it at the end of input. > > I think this is the most safe implementation. > > For #1 and #2, it works. > > For #3, I think we should avoid #3, actually, both > `CRowValuesInputFormat` > > and `FileSystemLookupFunction` have corresponding implementations, the > > specific `InputFormat`s can ensure they have nice behavior about > > `reachedEnd` and `nextRecord`. Of course, we can also make their > > invocations more robust. > > > > Actually, IIUC, the InputFormat is a legacy interface, the new interface > > should be FLIP-27.[1] And we are planning implementing FileSource on the > > new interfaces too. > > > > [1] > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > > > Best, > > Jingsong > > > > On Thu, Jul 23, 2020 at 6:31 PM Benchao Li <[hidden email]> wrote: > > > > > Hi all, > > > > > > I'd like to discuss about the semantic of returning null from > > > InputFormat#nextRecord. > > > > > > For now, there is no explicit java doc about this. And there are three > > ways > > > to handle > > > this in Flink: > > > 1. treat null as the end of input > > > 2. skip null > > > 3. assumes that the value from InputFormat#nextRecord cannot be null > > > > > > I quickly searched in Flink codebase about these usage: > > > - org.apache.flink.api.common.operators.GenericDataSourceBase [2] > > > - org.apache.flink.api.java.io.CsvInputFormat [2] > > > - org.apache.flink.runtime.operators.DataSourceTask [2] > > > - > > > > > > > > > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator > > > [2] > > > - > > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction > > > [1] > > > - org.apache.flink.table.sources.CsvTableSource [1] > > > - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3] > > > - org.apache.flink.table.filesystem.FileSystemLookupFunction [3] > > > > > > I think we can align these behavior. about the alignment, I personally > > lean > > > to #2 > > > > > > A step further, when will InputFormat#nextRecord returns null? > > > One scenario is that we encountered dirty data, and want to skip it. > > > Actually we face the same problem in > > > org.apache.flink.api.common.serialization.DeserializationSchema > > > in the past, and in 1.11 we added a method `void deserialize(byte[] > > > message, Collector<T> out)`. > > > It's default behavior is to ignore the null return value. > > > > > > Then could we also add a method `void nextRecord(OT reuse, > Collector<OT> > > > collector)` > > > in InputFormat? > > > Adding this method will enable us to return multi records in one call, > > > which is very flexible for implementing an InputFormat. > > > > > > WDHY? > > > > > > -- > > > > > > Best, > > > Benchao Li > > > > > > > > > -- > > Best, Jingsong Lee > > > > > -- > > Best, > Benchao Li > -- Best, Jingsong Lee |
Hi Jingsong,
Thanks for the input. Our current implementation is very like the way you posted. Btw, we are doing the unification of streaming/batch work using our Flink SQL engine, hope to contribute more in this realm. Jingsong Li <[hidden email]> 于2020年7月30日周四 上午11:49写道: > Hi Benchao, > > I'm very glad that you are developing the ecology of batch~ > > More detail for #1 is in `CsvInputFormat`: > - There are dirty records (Comments) in `CsvInputFormat.readRecord`, so > unfortunately, it must return null. > - So go on like this, will return null in `nextRecord` like #2, so > `CsvInputFormat` overwrite `nextRecord` like this: > > @Override > public OUT nextRecord(OUT record) throws IOException { > OUT returnRecord = null; > do { > returnRecord = super.nextRecord(record); > } while (returnRecord == null && !reachedEnd()); > > return returnRecord; > } > > > Best, > Jingsong > > On Thu, Jul 30, 2020 at 11:41 AM Benchao Li <[hidden email]> wrote: > > > Thanks Jingsong for the response. > > > > If we plan to deprecate InputFormat in the near future, I think it's ok > to > > keep current behavior > > as it is for now. > > > > I raised this discussion because I found that InputFormatSourceFunction > is > > not working as expected > > while I'm developing a batch table source internally. > > Our batch table source reads raw data from our internal storage, which > has > > the same interface as Kafka, > > then we can leverage all the formats in streaming, e.g. json/pb. So it's > > common that we encounters dirty > > data and want to skip it like in DeserializationSchema. > > That's why I prefer #2. Anyway, If we are gonna to deprecate InputFormat > in > > the near future, I think there > > is no need to change this in the community. > > > > > > Jingsong Li <[hidden email]> 于2020年7月30日周四 上午10:30写道: > > > > > Hi Benchao, > > > > > > My understanding is that #1 treats null as the end of input. > > > This means we should try our best to avoid returning null before the > end > > of > > > input in the implementation of `InputFormat`. Even if we have to return > > > null, we have to return it at the end of input. > > > I think this is the most safe implementation. > > > For #1 and #2, it works. > > > For #3, I think we should avoid #3, actually, both > > `CRowValuesInputFormat` > > > and `FileSystemLookupFunction` have corresponding implementations, the > > > specific `InputFormat`s can ensure they have nice behavior about > > > `reachedEnd` and `nextRecord`. Of course, we can also make their > > > invocations more robust. > > > > > > Actually, IIUC, the InputFormat is a legacy interface, the new > interface > > > should be FLIP-27.[1] And we are planning implementing FileSource on > the > > > new interfaces too. > > > > > > [1] > > > > > > > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface > > > > > > Best, > > > Jingsong > > > > > > On Thu, Jul 23, 2020 at 6:31 PM Benchao Li <[hidden email]> > wrote: > > > > > > > Hi all, > > > > > > > > I'd like to discuss about the semantic of returning null from > > > > InputFormat#nextRecord. > > > > > > > > For now, there is no explicit java doc about this. And there are > three > > > ways > > > > to handle > > > > this in Flink: > > > > 1. treat null as the end of input > > > > 2. skip null > > > > 3. assumes that the value from InputFormat#nextRecord cannot be null > > > > > > > > I quickly searched in Flink codebase about these usage: > > > > - org.apache.flink.api.common.operators.GenericDataSourceBase [2] > > > > - org.apache.flink.api.java.io.CsvInputFormat [2] > > > > - org.apache.flink.runtime.operators.DataSourceTask [2] > > > > - > > > > > > > > > > > > > > org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator > > > > [2] > > > > - > > > > org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction > > > > [1] > > > > - org.apache.flink.table.sources.CsvTableSource [1] > > > > - org.apache.flink.table.runtime.io.CRowValuesInputFormat [3] > > > > - org.apache.flink.table.filesystem.FileSystemLookupFunction [3] > > > > > > > > I think we can align these behavior. about the alignment, I > personally > > > lean > > > > to #2 > > > > > > > > A step further, when will InputFormat#nextRecord returns null? > > > > One scenario is that we encountered dirty data, and want to skip it. > > > > Actually we face the same problem in > > > > org.apache.flink.api.common.serialization.DeserializationSchema > > > > in the past, and in 1.11 we added a method `void deserialize(byte[] > > > > message, Collector<T> out)`. > > > > It's default behavior is to ignore the null return value. > > > > > > > > Then could we also add a method `void nextRecord(OT reuse, > > Collector<OT> > > > > collector)` > > > > in InputFormat? > > > > Adding this method will enable us to return multi records in one > call, > > > > which is very flexible for implementing an InputFormat. > > > > > > > > WDHY? > > > > > > > > -- > > > > > > > > Best, > > > > Benchao Li > > > > > > > > > > > > > -- > > > Best, Jingsong Lee > > > > > > > > > -- > > > > Best, > > Benchao Li > > > > > -- > Best, Jingsong Lee > -- Best, Benchao Li |
Free forum by Nabble | Edit this page |