Blink Planner HBaseUpsertTableSink Exception

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Blink Planner HBaseUpsertTableSink Exception

LakeShen
Hi community , when I create the hbase sink table  in my  flink ddl sql
,just like this:





*create table sink_hbase_table(    rowkey                 VARCHAR,    cf
                  row(    kdt_it_count  bigint )) with (xxxxxx);*

and I run my flink task , it throws the exception like this :
*UpsertStreamTableSink requires that Table has a full primary keys if it is
updated.*
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
at
org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
at
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
at
org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)

I saw the flink source code , I find that in HBaseUpsertTableSink , the
method setKeyFields doesnt' has any code content,in StreamExecSink class,I
saw the code content like this :
*//TODO UpsertStreamTableSink setKeyFields interface should be
Array[Array[String]]*
but now the  UpsertStreamTableSink setKeyFields interface is Array[String],
it seems like the conflict with the above content.
Could we use HBaseUpsertTableSink in our flink task?Thanks your reply.
Reply | Threaded
Open this post in threaded view
|

Re: Blink Planner HBaseUpsertTableSink Exception

Jark Wu-2
Hi Lake,

This is not a problem of HBaseUpsertTableSink.
This is because the query loses primary key (e.g. concat(key1, key2) will
lose the primary key information [key1, key2] currently.),
but the validation of inserting checks the upsert query should have a
primary key. That’s why the exception is thrown.

IMO, in order to fix this problem, we need to enrich the primary key
inference to support all kinds of built-in function/operators.
But this is a large work which means it may not happen in 1.9.1.

Regards,
Jark

On Thu, 12 Sep 2019 at 14:39, LakeShen <[hidden email]> wrote:

> Hi community , when I create the hbase sink table  in my  flink ddl sql
> ,just like this:
>
>
>
>
>
> *create table sink_hbase_table(    rowkey                 VARCHAR,    cf
>                   row(    kdt_it_count  bigint )) with (xxxxxx);*
>
> and I run my flink task , it throws the exception like this :
> *UpsertStreamTableSink requires that Table has a full primary keys if it is
> updated.*
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> at
>
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> at
>
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
> at
>
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at
>
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>
> I saw the flink source code , I find that in HBaseUpsertTableSink , the
> method setKeyFields doesnt' has any code content,in StreamExecSink class,I
> saw the code content like this :
> *//TODO UpsertStreamTableSink setKeyFields interface should be
> Array[Array[String]]*
> but now the  UpsertStreamTableSink setKeyFields interface is Array[String],
> it seems like the conflict with the above content.
> Could we use HBaseUpsertTableSink in our flink task?Thanks your reply.
>
Reply | Threaded
Open this post in threaded view
|

Re: Blink Planner HBaseUpsertTableSink Exception

LakeShen
Thank you , Jark . I have added  the primary key in my flink sql before ,
and it throwed  the * Primary key and unique key are not supported yet. *Now
I know it ,thank you sincerely to reply me .

Best wishes,
LakeShen


Jark Wu <[hidden email]> 于2019年9月12日周四 下午3:15写道:

> Hi Lake,
>
> This is not a problem of HBaseUpsertTableSink.
> This is because the query loses primary key (e.g. concat(key1, key2) will
> lose the primary key information [key1, key2] currently.),
> but the validation of inserting checks the upsert query should have a
> primary key. That’s why the exception is thrown.
>
> IMO, in order to fix this problem, we need to enrich the primary key
> inference to support all kinds of built-in function/operators.
> But this is a large work which means it may not happen in 1.9.1.
>
> Regards,
> Jark
>
> On Thu, 12 Sep 2019 at 14:39, LakeShen <[hidden email]> wrote:
>
> > Hi community , when I create the hbase sink table  in my  flink ddl sql
> > ,just like this:
> >
> >
> >
> >
> >
> > *create table sink_hbase_table(    rowkey                 VARCHAR,    cf
> >                   row(    kdt_it_count  bigint )) with (xxxxxx);*
> >
> > and I run my flink task , it throws the exception like this :
> > *UpsertStreamTableSink requires that Table has a full primary keys if it
> is
> > updated.*
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54)
> > at
> >
> >
> org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61)
> > at
> >
> >
> org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at
> >
> >
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> > at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> >
> > I saw the flink source code , I find that in HBaseUpsertTableSink , the
> > method setKeyFields doesnt' has any code content,in StreamExecSink
> class,I
> > saw the code content like this :
> > *//TODO UpsertStreamTableSink setKeyFields interface should be
> > Array[Array[String]]*
> > but now the  UpsertStreamTableSink setKeyFields interface is
> Array[String],
> > it seems like the conflict with the above content.
> > Could we use HBaseUpsertTableSink in our flink task?Thanks your reply.
> >
>