Hi community , when I create the hbase sink table in my flink ddl sql
,just like this: *create table sink_hbase_table( rowkey VARCHAR, cf row( kdt_it_count bigint )) with (xxxxxx);* and I run my flink task , it throws the exception like this : *UpsertStreamTableSink requires that Table has a full primary keys if it is updated.* at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) I saw the flink source code , I find that in HBaseUpsertTableSink , the method setKeyFields doesnt' has any code content,in StreamExecSink class,I saw the code content like this : *//TODO UpsertStreamTableSink setKeyFields interface should be Array[Array[String]]* but now the UpsertStreamTableSink setKeyFields interface is Array[String], it seems like the conflict with the above content. Could we use HBaseUpsertTableSink in our flink task?Thanks your reply. |
Hi Lake,
This is not a problem of HBaseUpsertTableSink. This is because the query loses primary key (e.g. concat(key1, key2) will lose the primary key information [key1, key2] currently.), but the validation of inserting checks the upsert query should have a primary key. That’s why the exception is thrown. IMO, in order to fix this problem, we need to enrich the primary key inference to support all kinds of built-in function/operators. But this is a large work which means it may not happen in 1.9.1. Regards, Jark On Thu, 12 Sep 2019 at 14:39, LakeShen <[hidden email]> wrote: > Hi community , when I create the hbase sink table in my flink ddl sql > ,just like this: > > > > > > *create table sink_hbase_table( rowkey VARCHAR, cf > row( kdt_it_count bigint )) with (xxxxxx);* > > and I run my flink task , it throws the exception like this : > *UpsertStreamTableSink requires that Table has a full primary keys if it is > updated.* > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) > at > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > at > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) > at > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > I saw the flink source code , I find that in HBaseUpsertTableSink , the > method setKeyFields doesnt' has any code content,in StreamExecSink class,I > saw the code content like this : > *//TODO UpsertStreamTableSink setKeyFields interface should be > Array[Array[String]]* > but now the UpsertStreamTableSink setKeyFields interface is Array[String], > it seems like the conflict with the above content. > Could we use HBaseUpsertTableSink in our flink task?Thanks your reply. > |
Thank you , Jark . I have added the primary key in my flink sql before ,
and it throwed the * Primary key and unique key are not supported yet. *Now I know it ,thank you sincerely to reply me . Best wishes, LakeShen Jark Wu <[hidden email]> 于2019年9月12日周四 下午3:15写道: > Hi Lake, > > This is not a problem of HBaseUpsertTableSink. > This is because the query loses primary key (e.g. concat(key1, key2) will > lose the primary key information [key1, key2] currently.), > but the validation of inserting checks the upsert query should have a > primary key. That’s why the exception is thrown. > > IMO, in order to fix this problem, we need to enrich the primary key > inference to support all kinds of built-in function/operators. > But this is a large work which means it may not happen in 1.9.1. > > Regards, > Jark > > On Thu, 12 Sep 2019 at 14:39, LakeShen <[hidden email]> wrote: > > > Hi community , when I create the hbase sink table in my flink ddl sql > > ,just like this: > > > > > > > > > > > > *create table sink_hbase_table( rowkey VARCHAR, cf > > row( kdt_it_count bigint )) with (xxxxxx);* > > > > and I run my flink task , it throws the exception like this : > > *UpsertStreamTableSink requires that Table has a full primary keys if it > is > > updated.* > > at > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:115) > > at > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.scala:50) > > at > > > > > org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:54) > > at > > > > > org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecSink.translateToPlan(StreamExecSink.scala:50) > > at > > > > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:61) > > at > > > > > org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:60) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at > > > > > scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) > > at scala.collection.Iterator$class.foreach(Iterator.scala:891) > > at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) > > at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) > > > > I saw the flink source code , I find that in HBaseUpsertTableSink , the > > method setKeyFields doesnt' has any code content,in StreamExecSink > class,I > > saw the code content like this : > > *//TODO UpsertStreamTableSink setKeyFields interface should be > > Array[Array[String]]* > > but now the UpsertStreamTableSink setKeyFields interface is > Array[String], > > it seems like the conflict with the above content. > > Could we use HBaseUpsertTableSink in our flink task?Thanks your reply. > > > |
Free forum by Nabble | Edit this page |