roland wang created FLINK-15283:
----------------------------------- Summary: Scala version of TableSinkUtils has a problem when validating sinks. Key: FLINK-15283 URL: https://issues.apache.org/jira/browse/FLINK-15283 Project: Flink Issue Type: Bug Components: API / Scala Affects Versions: 1.9.0 Environment: All environments of flink 1.9.0 Reporter: roland wang *1. Phenomenon* I created a kafka sink with the schema like : {code:java} [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String] {code} When I tried to insert some data into this sink, an error occurs as follows: {code:java} Caused by: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [TEST_SINK] do not match. Query result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double] TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String] {code} ** Now I have to keep the order of the query schema absolutely as the sink's schema, which causes a lot of trouble. *2. Cause* I checked the code and found this line : {code:java} // validate schema of source table and table sink val srcFieldTypes = query.getTableSchema.getFieldDataTypes val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes if (srcFieldTypes.length != sinkFieldTypes.length || srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) => !PlannerTypeUtils.isInteroperable( fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF)) }) { ...{code} I sink when they try to compare the sink's schma to query's schema, the zip code goes wrong because they forget to sort both of the schema. I trully hope this bug could be fixed soon. Thanks for all your hard work. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
-
I think you might have misunderstood the sink schema. It is a order field but not json (or mapping) field in table sink ddl. At least that's the case in Sql. - for exemple. DDL sink schema as <A String ,B String ,C String > and the sql is insert into table select A, C ,B from t2。 - A ,B and C witch select in the sql will be validation is that field name define in DDL sink . it only means this。 - It will be mapping by index to sink , here , the result is A,C,B but not your A.B,C. Is that a bug? I think so ,for a common sql。 - I think this may be the result of flink's memory optimization selection. Is this correct roland wang (Jira) <[hidden email]> 于2019年12月16日周一 下午5:59写道: > roland wang created FLINK-15283: > ----------------------------------- > > Summary: Scala version of TableSinkUtils has a problem when > validating sinks. > Key: FLINK-15283 > URL: https://issues.apache.org/jira/browse/FLINK-15283 > Project: Flink > Issue Type: Bug > Components: API / Scala > Affects Versions: 1.9.0 > Environment: All environments of flink 1.9.0 > Reporter: roland wang > > > *1. Phenomenon* > > I created a kafka sink with the schema like : > {code:java} > [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String] > {code} > When I tried to insert some data into this sink, an error occurs as > follows: > {code:java} > Caused by: org.apache.flink.table.api.ValidationException: Field types of > query result and registered TableSink [TEST_SINK] do not match. Query > result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double] > TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String] > {code} > ** Now I have to keep the order of the query schema absolutely as the > sink's schema, which causes a lot of trouble. > > *2. Cause* > > I checked the code and found this line : > {code:java} > // validate schema of source table and table sink > val srcFieldTypes = query.getTableSchema.getFieldDataTypes > val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes > > if (srcFieldTypes.length != sinkFieldTypes.length || > srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) => > !PlannerTypeUtils.isInteroperable( > fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF)) > }) { > ...{code} > I sink when they try to compare the sink's schma to query's schema, the > zip code goes wrong because they forget to sort both of the schema. > > I trully hope this bug could be fixed soon. > > Thanks for all your hard work. > > > > -- > This message was sent by Atlassian Jira > (v8.3.4#803005) > |
Hi jingjing,
Please leave comment under the JIRA issue[1] to keep the discussion in one place. Thanks, Jark [1]: https://issues.apache.org/jira/browse/FLINK-15283 On Mon, 16 Dec 2019 at 23:00, jingjing bai <[hidden email]> wrote: > - > > I think you might have misunderstood the sink schema. It is a order > field but not json (or mapping) field in table sink ddl. At least that's > the case in Sql. > - > > for exemple. DDL sink schema as <A String ,B String ,C String > and > the sql is insert into table select A, C ,B from t2。 > - > > A ,B and C witch select in the sql will be validation is that field name > define in DDL sink . it only means this。 > - > > It will be mapping by index to sink , here , the result is A,C,B but > not your A.B,C. > > Is that a bug? I think so ,for a common sql。 > > > - > > I think this may be the result of flink's memory optimization > selection. Is this correct > > > roland wang (Jira) <[hidden email]> 于2019年12月16日周一 下午5:59写道: > > > roland wang created FLINK-15283: > > ----------------------------------- > > > > Summary: Scala version of TableSinkUtils has a problem when > > validating sinks. > > Key: FLINK-15283 > > URL: https://issues.apache.org/jira/browse/FLINK-15283 > > Project: Flink > > Issue Type: Bug > > Components: API / Scala > > Affects Versions: 1.9.0 > > Environment: All environments of flink 1.9.0 > > Reporter: roland wang > > > > > > *1. Phenomenon* > > > > I created a kafka sink with the schema like : > > {code:java} > > [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String] > > {code} > > When I tried to insert some data into this sink, an error occurs as > > follows: > > {code:java} > > Caused by: org.apache.flink.table.api.ValidationException: Field types of > > query result and registered TableSink [TEST_SINK] do not match. Query > > result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double] > > TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String] > > {code} > > ** Now I have to keep the order of the query schema absolutely as the > > sink's schema, which causes a lot of trouble. > > > > *2. Cause* > > > > I checked the code and found this line : > > {code:java} > > // validate schema of source table and table sink > > val srcFieldTypes = query.getTableSchema.getFieldDataTypes > > val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes > > > > if (srcFieldTypes.length != sinkFieldTypes.length || > > srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) => > > !PlannerTypeUtils.isInteroperable( > > fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF)) > > }) { > > ...{code} > > I sink when they try to compare the sink's schma to query's schema, the > > zip code goes wrong because they forget to sort both of the schema. > > > > I trully hope this bug could be fixed soon. > > > > Thanks for all your hard work. > > > > > > > > -- > > This message was sent by Atlassian Jira > > (v8.3.4#803005) > > > |
Free forum by Nabble | Edit this page |