Hi Flink developers,
When implementing `JDBCTableSource` with `ProjectableTableSource` interface I'm confused by the `projectFields` method. The java doc of the `projectFields` states that (It also has a typo... poduced -> produced): > Creates a copy of the {@link TableSource} that projects its output to the > given field indexes. > The field indexes relate to the physical poduced data type ({@link > TableSource#getProducedDataType()}) and not to the table schema ({@link > TableSource#getTableSchema} of the {@link TableSource}. So my understanding of this java doc is that, if the table schema of the source is {a: Int, b: Double, c: String, d: Long} and the produced data type of the source is {a: Int, c: String, d: Long}. Then if user writes "select c, d from my_table" then the project field indices should be {1, 2} instead of {2, 3} (because they should be related to the produced type and not to the schema). But the implementation of `CSVTableSource` says otherwise. The field indices are related to the schema, not to the produced data type. I pick the implementation of `CSVTableSource` to implement JDBC table source (as `CSVTableSource` obviously passed all the test cases). So which one is correct, my understanding on the java doc or the implementation of `CSVTableSource`? Thanks. |
Hi Caizhi,
from my understanding, the "ProjectableTableSource" interface is used for something like predicator push-down scenarios: where your produced output should be the same as how your SELECT statement requires. For example, in the case of: SourceSchema: {a: Int, b: Double, c: String, d: Long} SQL: "select c, d from my_table" If implemented ProjectableTableSource, Flink will invoke the projectFields method to create another TableSource that does not return the full schema, but only the SelectedFields. (this is particularly useful for columnar store formats like Parquet where only a subset of files gets read into Flink). So, in short, when Flink invokes the override "projectFields" for your table source. it wouldn't be passing in the argument as {1,2}, but will be {2,3} - e.g. field "c" and "d". -------- This also brings to an interesting question: in your example, you mentioned that the table source is already having a produce type as {a: Int, c: String, d: Long}. I am assuming you are asking this when considering something similar to: if two SQLs are acting on the source, "SELECT a, c ..." and "SELECT c, d ..." I am not 100% sure since it has been sometime since I look at the code, but my understanding is: the projectFields will be invoked twice. and generate 2 new instances of the table source, with the same table schema but one with produced type {a: Int, c: String} and one with {c: String, d: Long}. So, there will not be a table source with {a: Int, c: String, d: Long}. Thanks, Rong On Thu, Jul 11, 2019 at 9:53 PM Caizhi Weng <[hidden email]> wrote: > Hi Flink developers, > > When implementing `JDBCTableSource` with `ProjectableTableSource` interface > I'm confused by the `projectFields` method. > > The java doc of the `projectFields` states that (It also has a typo... > poduced -> produced): > > > Creates a copy of the {@link TableSource} that projects its output to the > > given field indexes. > > The field indexes relate to the physical poduced data type ({@link > > TableSource#getProducedDataType()}) and not to the table schema ({@link > > TableSource#getTableSchema} of the {@link TableSource}. > > > So my understanding of this java doc is that, if the table schema of the > source is {a: Int, b: Double, c: String, d: Long} and the produced data > type of the source is {a: Int, c: String, d: Long}. Then if user writes > "select > c, d from my_table" then the project field indices should be {1, 2} instead > of {2, 3} (because they should be related to the produced type and not to > the schema). > > But the implementation of `CSVTableSource` says otherwise. The field > indices are related to the schema, not to the produced data type. > > I pick the implementation of `CSVTableSource` to implement JDBC table > source (as `CSVTableSource` obviously passed all the test cases). So which > one is correct, my understanding on the java doc or the implementation of > `CSVTableSource`? > > Thanks. > |
Hi Rong,
Thanks for your explanation. What I'm wondering when implementing this interface is that, will `projectFields` be called twice in a row? (something like `source.projectFields().projectFields()`) For example if user writes "select c, d from (select a, c, d from my_table)". In this case we might have to consider whether the indices are related to the produced type or to the table schema. Thanks. Rong Rong <[hidden email]> 于2019年7月13日周六 上午1:23写道: > Hi Caizhi, > > from my understanding, the "ProjectableTableSource" interface is used for > something like predicator push-down scenarios: > where your produced output should be the same as how your SELECT statement > requires. > > For example, in the case of: > SourceSchema: {a: Int, b: Double, c: String, d: Long} > SQL: "select c, d from my_table" > > If implemented ProjectableTableSource, Flink will invoke the projectFields > method to create another TableSource that does not > return the full schema, but only the SelectedFields. (this is particularly > useful for columnar store formats like Parquet where only a subset of files > gets read into Flink). > > So, in short, when Flink invokes the override "projectFields" for your > table source. it wouldn't be passing in the argument as {1,2}, but will be > {2,3} - e.g. field "c" and "d". > > -------- > This also brings to an interesting question: in your example, you mentioned > that the table source is already having a produce type as {a: Int, c: > String, d: Long}. I am assuming you are asking this when considering > something similar to: if two SQLs are acting on the source, "SELECT a, c > ..." and "SELECT c, d ..." > > I am not 100% sure since it has been sometime since I look at the code, but > my understanding is: > the projectFields will be invoked twice. and generate 2 new instances of > the table source, with the same table schema > but one with produced type {a: Int, c: String} and one with {c: String, d: > Long}. So, there will not be a table source with {a: Int, c: String, d: > Long}. > > Thanks, > Rong > > > > On Thu, Jul 11, 2019 at 9:53 PM Caizhi Weng <[hidden email]> wrote: > > > Hi Flink developers, > > > > When implementing `JDBCTableSource` with `ProjectableTableSource` > interface > > I'm confused by the `projectFields` method. > > > > The java doc of the `projectFields` states that (It also has a typo... > > poduced -> produced): > > > > > Creates a copy of the {@link TableSource} that projects its output to > the > > > given field indexes. > > > The field indexes relate to the physical poduced data type ({@link > > > TableSource#getProducedDataType()}) and not to the table schema ({@link > > > TableSource#getTableSchema} of the {@link TableSource}. > > > > > > So my understanding of this java doc is that, if the table schema of the > > source is {a: Int, b: Double, c: String, d: Long} and the produced data > > type of the source is {a: Int, c: String, d: Long}. Then if user writes > > "select > > c, d from my_table" then the project field indices should be {1, 2} > instead > > of {2, 3} (because they should be related to the produced type and not to > > the schema). > > > > But the implementation of `CSVTableSource` says otherwise. The field > > indices are related to the schema, not to the produced data type. > > > > I pick the implementation of `CSVTableSource` to implement JDBC table > > source (as `CSVTableSource` obviously passed all the test cases). So > which > > one is correct, my understanding on the java doc or the implementation of > > `CSVTableSource`? > > > > Thanks. > > > |
I am pretty confident that projectFields will not be called twice. but
that's not actually a problem. and I think I understand your question better now: According to the API doc: "The field indexes relate to the physical [produced] data type ({@link TableSource#getProducedDataType()})" This to me means: "field indexes (as a noun) relate (as a verb) to the produce data type" -> e.g. "fields" argument controls what the produce data type is. This also means: the `projectFields(int[] fields)` argument `fields` are indexed against the table schema. So even if they are called twice, you should always treat the `fields` integer array as indexed against the table schema, which is immutable. ------ On a side note, on this SQL "select c, d from (select a, c, d from my_table)". I am not 100% sure about the detail implementation: but IMO only the inner layer "from" statement triggers projectable table source rule as it is directly acting on the table source, regardless of whether the outer layer select fields gets push all the way down to the inner layer select. I think you can try to debug a bit against the Calcite rules to see when "PushProjectIntoTableSourceScanRule" gets invoked. -- Rong -- Rong On Fri, Jul 12, 2019 at 8:33 PM Caizhi Weng <[hidden email]> wrote: > Hi Rong, > > Thanks for your explanation. What I'm wondering when implementing this > interface is that, will `projectFields` be called twice in a row? > (something like `source.projectFields().projectFields()`) For example if > user writes "select c, d from (select a, c, d from my_table)". In this case > we might have to consider whether the indices are related to the produced > type or to the table schema. > > Thanks. > > Rong Rong <[hidden email]> 于2019年7月13日周六 上午1:23写道: > > > Hi Caizhi, > > > > from my understanding, the "ProjectableTableSource" interface is used for > > something like predicator push-down scenarios: > > where your produced output should be the same as how your SELECT > statement > > requires. > > > > For example, in the case of: > > SourceSchema: {a: Int, b: Double, c: String, d: Long} > > SQL: "select c, d from my_table" > > > > If implemented ProjectableTableSource, Flink will invoke the > projectFields > > method to create another TableSource that does not > > return the full schema, but only the SelectedFields. (this is > particularly > > useful for columnar store formats like Parquet where only a subset of > files > > gets read into Flink). > > > > So, in short, when Flink invokes the override "projectFields" for your > > table source. it wouldn't be passing in the argument as {1,2}, but will > be > > {2,3} - e.g. field "c" and "d". > > > > -------- > > This also brings to an interesting question: in your example, you > mentioned > > that the table source is already having a produce type as {a: Int, c: > > String, d: Long}. I am assuming you are asking this when considering > > something similar to: if two SQLs are acting on the source, "SELECT a, c > > ..." and "SELECT c, d ..." > > > > I am not 100% sure since it has been sometime since I look at the code, > but > > my understanding is: > > the projectFields will be invoked twice. and generate 2 new instances of > > the table source, with the same table schema > > but one with produced type {a: Int, c: String} and one with {c: String, > d: > > Long}. So, there will not be a table source with {a: Int, c: String, d: > > Long}. > > > > Thanks, > > Rong > > > > > > > > On Thu, Jul 11, 2019 at 9:53 PM Caizhi Weng <[hidden email]> > wrote: > > > > > Hi Flink developers, > > > > > > When implementing `JDBCTableSource` with `ProjectableTableSource` > > interface > > > I'm confused by the `projectFields` method. > > > > > > The java doc of the `projectFields` states that (It also has a typo... > > > poduced -> produced): > > > > > > > Creates a copy of the {@link TableSource} that projects its output to > > the > > > > given field indexes. > > > > The field indexes relate to the physical poduced data type ({@link > > > > TableSource#getProducedDataType()}) and not to the table schema > ({@link > > > > TableSource#getTableSchema} of the {@link TableSource}. > > > > > > > > > So my understanding of this java doc is that, if the table schema of > the > > > source is {a: Int, b: Double, c: String, d: Long} and the produced data > > > type of the source is {a: Int, c: String, d: Long}. Then if user writes > > > "select > > > c, d from my_table" then the project field indices should be {1, 2} > > instead > > > of {2, 3} (because they should be related to the produced type and not > to > > > the schema). > > > > > > But the implementation of `CSVTableSource` says otherwise. The field > > > indices are related to the schema, not to the produced data type. > > > > > > I pick the implementation of `CSVTableSource` to implement JDBC table > > > source (as `CSVTableSource` obviously passed all the test cases). So > > which > > > one is correct, my understanding on the java doc or the implementation > of > > > `CSVTableSource`? > > > > > > Thanks. > > > > > > |
Free forum by Nabble | Edit this page |