[jira] [Created] (FLINK-15283) Scala version of TableSinkUtils has a problem when validating sinks.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-15283) Scala version of TableSinkUtils has a problem when validating sinks.

Shang Yuanchun (Jira)
roland wang created FLINK-15283:
-----------------------------------

             Summary: Scala version of TableSinkUtils has a problem when validating sinks.
                 Key: FLINK-15283
                 URL: https://issues.apache.org/jira/browse/FLINK-15283
             Project: Flink
          Issue Type: Bug
          Components: API / Scala
    Affects Versions: 1.9.0
         Environment: All environments of flink 1.9.0
            Reporter: roland wang


*1. Phenomenon*

I created a kafka sink with the schema like :
{code:java}
[BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
{code}
When I tried to insert some data into this sink, an error occurs as follows:
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink [TEST_SINK] do not match. Query result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double] TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
{code}
 ** Now I have to keep the order of the query schema absolutely as the sink's schema, which causes a lot of trouble.

*2. Cause*

I checked the code and found this line :
{code:java}
// validate schema of source table and table sink
val srcFieldTypes = query.getTableSchema.getFieldDataTypes
val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes

if (srcFieldTypes.length != sinkFieldTypes.length ||
  srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) =>
    !PlannerTypeUtils.isInteroperable(
      fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF))
  }) {
...{code}
I sink when they try to compare the sink's schma to query's schema, the zip code goes wrong because they forget to sort both of the schema.

I trully hope this bug could be fixed soon.

Thanks for all your hard work.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)
Reply | Threaded
Open this post in threaded view
|

Re: [jira] [Created] (FLINK-15283) Scala version of TableSinkUtils has a problem when validating sinks.

jingjing bai
   -

   I think you might have misunderstood the sink schema.  It is a order
   field but not json (or mapping) field in table sink ddl. At least that's
   the case in Sql.
   -

   for exemple.  DDL sink schema as    <A String ,B String ,C String >  and
   the sql is insert into table select  A, C ,B from t2。
   -

   A ,B and C witch select in the sql will be validation is that field name
   define in DDL sink . it only means this。
   -

   It will be mapping by index to sink , here , the result is  A,C,B but
   not your A.B,C.

Is that a bug?  I think so ,for a common sql。


   -

   I think this may be the result of flink's memory optimization
   selection.   Is this correct


roland wang (Jira) <[hidden email]> 于2019年12月16日周一 下午5:59写道:

> roland wang created FLINK-15283:
> -----------------------------------
>
>              Summary: Scala version of TableSinkUtils has a problem when
> validating sinks.
>                  Key: FLINK-15283
>                  URL: https://issues.apache.org/jira/browse/FLINK-15283
>              Project: Flink
>           Issue Type: Bug
>           Components: API / Scala
>     Affects Versions: 1.9.0
>          Environment: All environments of flink 1.9.0
>             Reporter: roland wang
>
>
> *1. Phenomenon*
>
> I created a kafka sink with the schema like :
> {code:java}
> [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
> {code}
> When I tried to insert some data into this sink, an error occurs as
> follows:
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Field types of
> query result and registered TableSink [TEST_SINK] do not match. Query
> result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double]
> TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
> {code}
>  ** Now I have to keep the order of the query schema absolutely as the
> sink's schema, which causes a lot of trouble.
>
> *2. Cause*
>
> I checked the code and found this line :
> {code:java}
> // validate schema of source table and table sink
> val srcFieldTypes = query.getTableSchema.getFieldDataTypes
> val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes
>
> if (srcFieldTypes.length != sinkFieldTypes.length ||
>   srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) =>
>     !PlannerTypeUtils.isInteroperable(
>       fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF))
>   }) {
> ...{code}
> I sink when they try to compare the sink's schma to query's schema, the
> zip code goes wrong because they forget to sort both of the schema.
>
> I trully hope this bug could be fixed soon.
>
> Thanks for all your hard work.
>
>
>
> --
> This message was sent by Atlassian Jira
> (v8.3.4#803005)
>
Reply | Threaded
Open this post in threaded view
|

Re: [jira] [Created] (FLINK-15283) Scala version of TableSinkUtils has a problem when validating sinks.

Jark Wu-2
Hi jingjing,

Please leave comment under the JIRA issue[1] to keep the discussion in one
place.

Thanks,
Jark

[1]: https://issues.apache.org/jira/browse/FLINK-15283

On Mon, 16 Dec 2019 at 23:00, jingjing bai <[hidden email]>
wrote:

>    -
>
>    I think you might have misunderstood the sink schema.  It is a order
>    field but not json (or mapping) field in table sink ddl. At least that's
>    the case in Sql.
>    -
>
>    for exemple.  DDL sink schema as    <A String ,B String ,C String >  and
>    the sql is insert into table select  A, C ,B from t2。
>    -
>
>    A ,B and C witch select in the sql will be validation is that field name
>    define in DDL sink . it only means this。
>    -
>
>    It will be mapping by index to sink , here , the result is  A,C,B but
>    not your A.B,C.
>
> Is that a bug?  I think so ,for a common sql。
>
>
>    -
>
>    I think this may be the result of flink's memory optimization
>    selection.   Is this correct
>
>
> roland wang (Jira) <[hidden email]> 于2019年12月16日周一 下午5:59写道:
>
> > roland wang created FLINK-15283:
> > -----------------------------------
> >
> >              Summary: Scala version of TableSinkUtils has a problem when
> > validating sinks.
> >                  Key: FLINK-15283
> >                  URL: https://issues.apache.org/jira/browse/FLINK-15283
> >              Project: Flink
> >           Issue Type: Bug
> >           Components: API / Scala
> >     Affects Versions: 1.9.0
> >          Environment: All environments of flink 1.9.0
> >             Reporter: roland wang
> >
> >
> > *1. Phenomenon*
> >
> > I created a kafka sink with the schema like :
> > {code:java}
> > [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
> > {code}
> > When I tried to insert some data into this sink, an error occurs as
> > follows:
> > {code:java}
> > Caused by: org.apache.flink.table.api.ValidationException: Field types of
> > query result and registered TableSink [TEST_SINK] do not match. Query
> > result schema: [ORDER_NO: String, BAK_NO: String, TRANS_AMT: Double]
> > TableSink schema: [BAK_NO: String, TRANS_AMT: Double, ORDER_NO: String]
> > {code}
> >  ** Now I have to keep the order of the query schema absolutely as the
> > sink's schema, which causes a lot of trouble.
> >
> > *2. Cause*
> >
> > I checked the code and found this line :
> > {code:java}
> > // validate schema of source table and table sink
> > val srcFieldTypes = query.getTableSchema.getFieldDataTypes
> > val sinkFieldTypes = sink.getTableSchema.getFieldDataTypes
> >
> > if (srcFieldTypes.length != sinkFieldTypes.length ||
> >   srcFieldTypes.zip(sinkFieldTypes).exists { case (srcF, snkF) =>
> >     !PlannerTypeUtils.isInteroperable(
> >       fromDataTypeToLogicalType(srcF), fromDataTypeToLogicalType(snkF))
> >   }) {
> > ...{code}
> > I sink when they try to compare the sink's schma to query's schema, the
> > zip code goes wrong because they forget to sort both of the schema.
> >
> > I trully hope this bug could be fixed soon.
> >
> > Thanks for all your hard work.
> >
> >
> >
> > --
> > This message was sent by Atlassian Jira
> > (v8.3.4#803005)
> >
>