[DISCUSS] JDBC exactly-once sink

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] JDBC exactly-once sink

Roman Khachatryan
Hi everyone,

I'm currently working on exactly-once JDBC sink implementation for Flink.
Any ideas and/or feedback are welcome.

I've considered the following options:
1. Two-phase commit. This is similar to Kafka sink.
XA or database-specific API can be used. In case of XA, each sink subtask
acts as a transaction manager, and each checkpoint-subtask pair corresponds
to an XA transaction (with a single branch)
2. Write-ahead log. This is similar to Cassandra sink.
Transactions metadata needs to be stored in the database along with data to
avoid adding duplicates after recovery.

For some scenarios, WAL might be better, but in general, XA seems to be a
better option.

==================
XA vs WAL comparison
==================

1. Consistency: XA preferable
WAL: longer inconsistency windows when writing from several sink subtasks

2. Performance and efficiency: XA preferable (depends on the use case)
XA:
- long-running transactions may delay GC and may hold locks (depends on the
use case)
- databases/drivers may have XA implementation issues
WAL:
- double (de)serialization and IO (first to flink state, then to database)
- read-from-state and write-to-database spikes on checkpoint completion
both may have read spikes in consumer

3. Database support: XA preferable
XA: most popular RDBMS do support it (at least mysql, pgsql, mssql, oracle,
db2, sybase)
WAL: meta table DDL may differ

4. Operability: depends on the use case
XA:
- increased undo segment (db may need to maintain a view from the
transaction start)
- abandoned transactions cleanup (abandoned tx may cause starvation if for
example database blocks inserts of duplicates in different transactions)
- (jars aren't an issue - most drivers ship XA implementation)
WAL:
- increased intermediate flink state
- need to maintain meta table

5. Simplicity: about the same
XA: more corner cases
WAL: state and meta table management
Both wrap writes into transactions

6. Testing - WAL preferable
XA requires MVVC and proper XA support (no jars needed for derby)

--
Regards,
Roman
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] JDBC exactly-once sink

Jingsong Li
Thanks Roman for driving this.

Although Upsert sink can bring some consistency guarantee to JDBC sink,
users have strong requirements to append exactly-once sink too.

+1 to use XA distribution transaction.

For WAL, as far as my experience is concerned, writing JDBC in large
quantities when checkpoint is completed will easily lead to too much
pressure and some instability to database.

For testing, maybe we need add some XA testing in e2e.

But I'm worried about the performance of XA. Sometimes using MySQL's XA
will lead to poor performance (of course, it may be related to usage).

Best,
Jingsong Lee

On Mon, Jan 6, 2020 at 4:41 PM Roman Khachatryan <[hidden email]>
wrote:

> Hi everyone,
>
> I'm currently working on exactly-once JDBC sink implementation for Flink.
> Any ideas and/or feedback are welcome.
>
> I've considered the following options:
> 1. Two-phase commit. This is similar to Kafka sink.
> XA or database-specific API can be used. In case of XA, each sink subtask
> acts as a transaction manager, and each checkpoint-subtask pair corresponds
> to an XA transaction (with a single branch)
> 2. Write-ahead log. This is similar to Cassandra sink.
> Transactions metadata needs to be stored in the database along with data to
> avoid adding duplicates after recovery.
>
> For some scenarios, WAL might be better, but in general, XA seems to be a
> better option.
>
> ==================
> XA vs WAL comparison
> ==================
>
> 1. Consistency: XA preferable
> WAL: longer inconsistency windows when writing from several sink subtasks
>
> 2. Performance and efficiency: XA preferable (depends on the use case)
> XA:
> - long-running transactions may delay GC and may hold locks (depends on the
> use case)
> - databases/drivers may have XA implementation issues
> WAL:
> - double (de)serialization and IO (first to flink state, then to database)
> - read-from-state and write-to-database spikes on checkpoint completion
> both may have read spikes in consumer
>
> 3. Database support: XA preferable
> XA: most popular RDBMS do support it (at least mysql, pgsql, mssql, oracle,
> db2, sybase)
> WAL: meta table DDL may differ
>
> 4. Operability: depends on the use case
> XA:
> - increased undo segment (db may need to maintain a view from the
> transaction start)
> - abandoned transactions cleanup (abandoned tx may cause starvation if for
> example database blocks inserts of duplicates in different transactions)
> - (jars aren't an issue - most drivers ship XA implementation)
> WAL:
> - increased intermediate flink state
> - need to maintain meta table
>
> 5. Simplicity: about the same
> XA: more corner cases
> WAL: state and meta table management
> Both wrap writes into transactions
>
> 6. Testing - WAL preferable
> XA requires MVVC and proper XA support (no jars needed for derby)
>
> --
> Regards,
> Roman
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] JDBC exactly-once sink

Piotr Nowojski-3
Hi,

Also +1 for using the XA. There might be scenarios where WAL could be a better option, but I think XA should be the default/first choice. If there will be a bigger demand for WAL, we can always provide it as an alternative.

As Jingsong mentioned, with WAL I would be worried about batch like workload. Spamming external database with all of the accumulated records once per checkpoint can easily leads to DDoS scenarios, external system crashes.

Jingsong do we already have an upsert JDBC sink? I guess it's just in Table API, right?

Piotrek

> On 6 Jan 2020, at 10:43, Jingsong Li <[hidden email]> wrote:
>
> Thanks Roman for driving this.
>
> Although Upsert sink can bring some consistency guarantee to JDBC sink,
> users have strong requirements to append exactly-once sink too.
>
> +1 to use XA distribution transaction.
>
> For WAL, as far as my experience is concerned, writing JDBC in large
> quantities when checkpoint is completed will easily lead to too much
> pressure and some instability to database.
>
> For testing, maybe we need add some XA testing in e2e.
>
> But I'm worried about the performance of XA. Sometimes using MySQL's XA
> will lead to poor performance (of course, it may be related to usage).
>
> Best,
> Jingsong Lee
>
> On Mon, Jan 6, 2020 at 4:41 PM Roman Khachatryan <[hidden email]>
> wrote:
>
>> Hi everyone,
>>
>> I'm currently working on exactly-once JDBC sink implementation for Flink.
>> Any ideas and/or feedback are welcome.
>>
>> I've considered the following options:
>> 1. Two-phase commit. This is similar to Kafka sink.
>> XA or database-specific API can be used. In case of XA, each sink subtask
>> acts as a transaction manager, and each checkpoint-subtask pair corresponds
>> to an XA transaction (with a single branch)
>> 2. Write-ahead log. This is similar to Cassandra sink.
>> Transactions metadata needs to be stored in the database along with data to
>> avoid adding duplicates after recovery.
>>
>> For some scenarios, WAL might be better, but in general, XA seems to be a
>> better option.
>>
>> ==================
>> XA vs WAL comparison
>> ==================
>>
>> 1. Consistency: XA preferable
>> WAL: longer inconsistency windows when writing from several sink subtasks
>>
>> 2. Performance and efficiency: XA preferable (depends on the use case)
>> XA:
>> - long-running transactions may delay GC and may hold locks (depends on the
>> use case)
>> - databases/drivers may have XA implementation issues
>> WAL:
>> - double (de)serialization and IO (first to flink state, then to database)
>> - read-from-state and write-to-database spikes on checkpoint completion
>> both may have read spikes in consumer
>>
>> 3. Database support: XA preferable
>> XA: most popular RDBMS do support it (at least mysql, pgsql, mssql, oracle,
>> db2, sybase)
>> WAL: meta table DDL may differ
>>
>> 4. Operability: depends on the use case
>> XA:
>> - increased undo segment (db may need to maintain a view from the
>> transaction start)
>> - abandoned transactions cleanup (abandoned tx may cause starvation if for
>> example database blocks inserts of duplicates in different transactions)
>> - (jars aren't an issue - most drivers ship XA implementation)
>> WAL:
>> - increased intermediate flink state
>> - need to maintain meta table
>>
>> 5. Simplicity: about the same
>> XA: more corner cases
>> WAL: state and meta table management
>> Both wrap writes into transactions
>>
>> 6. Testing - WAL preferable
>> XA requires MVVC and proper XA support (no jars needed for derby)
>>
>> --
>> Regards,
>> Roman
>>
>
>
> --
> Best, Jingsong Lee

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] JDBC exactly-once sink

Jingsong Li
Hi Piotr,

We already have "JDBCUpsertOutputFormat". It is mainly proposed for
Table/SQL API, but DataStream can use it to have the semantics of upsert.
But it has weaker semantics than exactly once, and can only guarantee the
final consistency.

Best,
Jingsong Lee

On Mon, Jan 6, 2020 at 8:07 PM Piotr Nowojski <[hidden email]> wrote:

> Hi,
>
> Also +1 for using the XA. There might be scenarios where WAL could be a
> better option, but I think XA should be the default/first choice. If there
> will be a bigger demand for WAL, we can always provide it as an alternative.
>
> As Jingsong mentioned, with WAL I would be worried about batch like
> workload. Spamming external database with all of the accumulated records
> once per checkpoint can easily leads to DDoS scenarios, external system
> crashes.
>
> Jingsong do we already have an upsert JDBC sink? I guess it's just in
> Table API, right?
>
> Piotrek
>
> > On 6 Jan 2020, at 10:43, Jingsong Li <[hidden email]> wrote:
> >
> > Thanks Roman for driving this.
> >
> > Although Upsert sink can bring some consistency guarantee to JDBC sink,
> > users have strong requirements to append exactly-once sink too.
> >
> > +1 to use XA distribution transaction.
> >
> > For WAL, as far as my experience is concerned, writing JDBC in large
> > quantities when checkpoint is completed will easily lead to too much
> > pressure and some instability to database.
> >
> > For testing, maybe we need add some XA testing in e2e.
> >
> > But I'm worried about the performance of XA. Sometimes using MySQL's XA
> > will lead to poor performance (of course, it may be related to usage).
> >
> > Best,
> > Jingsong Lee
> >
> > On Mon, Jan 6, 2020 at 4:41 PM Roman Khachatryan <
> [hidden email]>
> > wrote:
> >
> >> Hi everyone,
> >>
> >> I'm currently working on exactly-once JDBC sink implementation for
> Flink.
> >> Any ideas and/or feedback are welcome.
> >>
> >> I've considered the following options:
> >> 1. Two-phase commit. This is similar to Kafka sink.
> >> XA or database-specific API can be used. In case of XA, each sink
> subtask
> >> acts as a transaction manager, and each checkpoint-subtask pair
> corresponds
> >> to an XA transaction (with a single branch)
> >> 2. Write-ahead log. This is similar to Cassandra sink.
> >> Transactions metadata needs to be stored in the database along with
> data to
> >> avoid adding duplicates after recovery.
> >>
> >> For some scenarios, WAL might be better, but in general, XA seems to be
> a
> >> better option.
> >>
> >> ==================
> >> XA vs WAL comparison
> >> ==================
> >>
> >> 1. Consistency: XA preferable
> >> WAL: longer inconsistency windows when writing from several sink
> subtasks
> >>
> >> 2. Performance and efficiency: XA preferable (depends on the use case)
> >> XA:
> >> - long-running transactions may delay GC and may hold locks (depends on
> the
> >> use case)
> >> - databases/drivers may have XA implementation issues
> >> WAL:
> >> - double (de)serialization and IO (first to flink state, then to
> database)
> >> - read-from-state and write-to-database spikes on checkpoint completion
> >> both may have read spikes in consumer
> >>
> >> 3. Database support: XA preferable
> >> XA: most popular RDBMS do support it (at least mysql, pgsql, mssql,
> oracle,
> >> db2, sybase)
> >> WAL: meta table DDL may differ
> >>
> >> 4. Operability: depends on the use case
> >> XA:
> >> - increased undo segment (db may need to maintain a view from the
> >> transaction start)
> >> - abandoned transactions cleanup (abandoned tx may cause starvation if
> for
> >> example database blocks inserts of duplicates in different transactions)
> >> - (jars aren't an issue - most drivers ship XA implementation)
> >> WAL:
> >> - increased intermediate flink state
> >> - need to maintain meta table
> >>
> >> 5. Simplicity: about the same
> >> XA: more corner cases
> >> WAL: state and meta table management
> >> Both wrap writes into transactions
> >>
> >> 6. Testing - WAL preferable
> >> XA requires MVVC and proper XA support (no jars needed for derby)
> >>
> >> --
> >> Regards,
> >> Roman
> >>
> >
> >
> > --
> > Best, Jingsong Lee
>
>

--
Best, Jingsong Lee