UpsertStreamTableSink vs OverwritableTableSink

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

UpsertStreamTableSink vs OverwritableTableSink

Gyula Fóra
Hi all!

While working on a Table Sink implementation for Kudu (key-value store) ,
we got a bit confused about the expected semantics of UpsertStreamTableSink
vs OverwritableTableSink and statements like INSERT vs INSERT OVERWRITE

I am wondering what external operation should each incoming record
corresponds to?
 - Insert (fail on duplicate row)
 - Delete
 - Upsert

Purely looking at the UpsertStreamTableSink we either upsert or delete so
every INSERT is pretty much an INSERT OVERWRITE. Still we cannot use INSERT
OVERWRITE if we don't implement OverwritableTableSink.

So I wonder what implementing OverwritableTableSink is expected to do for
an UpsertStreamTableSink.

Can someone please help me clarify this so we can get it right?

Thanks
Gyula
Reply | Threaded
Open this post in threaded view
|

Re: UpsertStreamTableSink vs OverwritableTableSink

Jingsong Li
Hi,

Insert overwrite comes from Batch SQL in Hive.
It means overwriting whole table/partition instead of overwriting per key.
So if "insert overwrite kudu_table", should delete whole table in kudu
first, and then insert new data to the table in kudu.

The same semantics should be used in streaming jobs, but I don't know if
there are any requirements.

UpsertStreamTableSink is a way to deal with changelog in streaming jobs,
the upsert messages and delete messages are produced by keyed and retracted
stream, this is not related to "insert overwrite" grammar.
And FYI, upsert stream sink will be unified to "DynamicTableSink" with
primary keys in DDL.[1]

[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces#FLIP-95:NewTableSourceandTableSinkinterfaces-SinkInterfaces


Best,
Jingsong Lee

On Wed, May 6, 2020 at 8:48 PM Gyula Fóra <[hidden email]> wrote:

> Hi all!
>
> While working on a Table Sink implementation for Kudu (key-value store) ,
> we got a bit confused about the expected semantics of UpsertStreamTableSink
> vs OverwritableTableSink and statements like INSERT vs INSERT OVERWRITE
>
> I am wondering what external operation should each incoming record
> corresponds to?
>  - Insert (fail on duplicate row)
>  - Delete
>  - Upsert
>
> Purely looking at the UpsertStreamTableSink we either upsert or delete so
> every INSERT is pretty much an INSERT OVERWRITE. Still we cannot use INSERT
> OVERWRITE if we don't implement OverwritableTableSink.
>
> So I wonder what implementing OverwritableTableSink is expected to do for
> an UpsertStreamTableSink.
>
> Can someone please help me clarify this so we can get it right?
>
> Thanks
> Gyula
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: UpsertStreamTableSink vs OverwritableTableSink

Gyula Fóra
Thanks a lot for the detailed explanation, makes complete sense.

Gyula

On Wed, May 6, 2020 at 3:53 PM Jingsong Li <[hidden email]> wrote:

> Hi,
>
> Insert overwrite comes from Batch SQL in Hive.
> It means overwriting whole table/partition instead of overwriting per key.
> So if "insert overwrite kudu_table", should delete whole table in kudu
> first, and then insert new data to the table in kudu.
>
> The same semantics should be used in streaming jobs, but I don't know if
> there are any requirements.
>
> UpsertStreamTableSink is a way to deal with changelog in streaming jobs,
> the upsert messages and delete messages are produced by keyed and retracted
> stream, this is not related to "insert overwrite" grammar.
> And FYI, upsert stream sink will be unified to "DynamicTableSink" with
> primary keys in DDL.[1]
>
> [1]
>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces#FLIP-95:NewTableSourceandTableSinkinterfaces-SinkInterfaces
>
>
> Best,
> Jingsong Lee
>
> On Wed, May 6, 2020 at 8:48 PM Gyula Fóra <[hidden email]> wrote:
>
> > Hi all!
> >
> > While working on a Table Sink implementation for Kudu (key-value store) ,
> > we got a bit confused about the expected semantics of
> UpsertStreamTableSink
> > vs OverwritableTableSink and statements like INSERT vs INSERT OVERWRITE
> >
> > I am wondering what external operation should each incoming record
> > corresponds to?
> >  - Insert (fail on duplicate row)
> >  - Delete
> >  - Upsert
> >
> > Purely looking at the UpsertStreamTableSink we either upsert or delete so
> > every INSERT is pretty much an INSERT OVERWRITE. Still we cannot use
> INSERT
> > OVERWRITE if we don't implement OverwritableTableSink.
> >
> > So I wonder what implementing OverwritableTableSink is expected to do for
> > an UpsertStreamTableSink.
> >
> > Can someone please help me clarify this so we can get it right?
> >
> > Thanks
> > Gyula
> >
>
>
> --
> Best, Jingsong Lee
>