Hi everyone,
I'm currently working on exactly-once JDBC sink implementation for Flink. Any ideas and/or feedback are welcome. I've considered the following options: 1. Two-phase commit. This is similar to Kafka sink. XA or database-specific API can be used. In case of XA, each sink subtask acts as a transaction manager, and each checkpoint-subtask pair corresponds to an XA transaction (with a single branch) 2. Write-ahead log. This is similar to Cassandra sink. Transactions metadata needs to be stored in the database along with data to avoid adding duplicates after recovery. For some scenarios, WAL might be better, but in general, XA seems to be a better option. ================== XA vs WAL comparison ================== 1. Consistency: XA preferable WAL: longer inconsistency windows when writing from several sink subtasks 2. Performance and efficiency: XA preferable (depends on the use case) XA: - long-running transactions may delay GC and may hold locks (depends on the use case) - databases/drivers may have XA implementation issues WAL: - double (de)serialization and IO (first to flink state, then to database) - read-from-state and write-to-database spikes on checkpoint completion both may have read spikes in consumer 3. Database support: XA preferable XA: most popular RDBMS do support it (at least mysql, pgsql, mssql, oracle, db2, sybase) WAL: meta table DDL may differ 4. Operability: depends on the use case XA: - increased undo segment (db may need to maintain a view from the transaction start) - abandoned transactions cleanup (abandoned tx may cause starvation if for example database blocks inserts of duplicates in different transactions) - (jars aren't an issue - most drivers ship XA implementation) WAL: - increased intermediate flink state - need to maintain meta table 5. Simplicity: about the same XA: more corner cases WAL: state and meta table management Both wrap writes into transactions 6. Testing - WAL preferable XA requires MVVC and proper XA support (no jars needed for derby) -- Regards, Roman |
Thanks Roman for driving this.
Although Upsert sink can bring some consistency guarantee to JDBC sink, users have strong requirements to append exactly-once sink too. +1 to use XA distribution transaction. For WAL, as far as my experience is concerned, writing JDBC in large quantities when checkpoint is completed will easily lead to too much pressure and some instability to database. For testing, maybe we need add some XA testing in e2e. But I'm worried about the performance of XA. Sometimes using MySQL's XA will lead to poor performance (of course, it may be related to usage). Best, Jingsong Lee On Mon, Jan 6, 2020 at 4:41 PM Roman Khachatryan <[hidden email]> wrote: > Hi everyone, > > I'm currently working on exactly-once JDBC sink implementation for Flink. > Any ideas and/or feedback are welcome. > > I've considered the following options: > 1. Two-phase commit. This is similar to Kafka sink. > XA or database-specific API can be used. In case of XA, each sink subtask > acts as a transaction manager, and each checkpoint-subtask pair corresponds > to an XA transaction (with a single branch) > 2. Write-ahead log. This is similar to Cassandra sink. > Transactions metadata needs to be stored in the database along with data to > avoid adding duplicates after recovery. > > For some scenarios, WAL might be better, but in general, XA seems to be a > better option. > > ================== > XA vs WAL comparison > ================== > > 1. Consistency: XA preferable > WAL: longer inconsistency windows when writing from several sink subtasks > > 2. Performance and efficiency: XA preferable (depends on the use case) > XA: > - long-running transactions may delay GC and may hold locks (depends on the > use case) > - databases/drivers may have XA implementation issues > WAL: > - double (de)serialization and IO (first to flink state, then to database) > - read-from-state and write-to-database spikes on checkpoint completion > both may have read spikes in consumer > > 3. Database support: XA preferable > XA: most popular RDBMS do support it (at least mysql, pgsql, mssql, oracle, > db2, sybase) > WAL: meta table DDL may differ > > 4. Operability: depends on the use case > XA: > - increased undo segment (db may need to maintain a view from the > transaction start) > - abandoned transactions cleanup (abandoned tx may cause starvation if for > example database blocks inserts of duplicates in different transactions) > - (jars aren't an issue - most drivers ship XA implementation) > WAL: > - increased intermediate flink state > - need to maintain meta table > > 5. Simplicity: about the same > XA: more corner cases > WAL: state and meta table management > Both wrap writes into transactions > > 6. Testing - WAL preferable > XA requires MVVC and proper XA support (no jars needed for derby) > > -- > Regards, > Roman > -- Best, Jingsong Lee |
Hi,
Also +1 for using the XA. There might be scenarios where WAL could be a better option, but I think XA should be the default/first choice. If there will be a bigger demand for WAL, we can always provide it as an alternative. As Jingsong mentioned, with WAL I would be worried about batch like workload. Spamming external database with all of the accumulated records once per checkpoint can easily leads to DDoS scenarios, external system crashes. Jingsong do we already have an upsert JDBC sink? I guess it's just in Table API, right? Piotrek > On 6 Jan 2020, at 10:43, Jingsong Li <[hidden email]> wrote: > > Thanks Roman for driving this. > > Although Upsert sink can bring some consistency guarantee to JDBC sink, > users have strong requirements to append exactly-once sink too. > > +1 to use XA distribution transaction. > > For WAL, as far as my experience is concerned, writing JDBC in large > quantities when checkpoint is completed will easily lead to too much > pressure and some instability to database. > > For testing, maybe we need add some XA testing in e2e. > > But I'm worried about the performance of XA. Sometimes using MySQL's XA > will lead to poor performance (of course, it may be related to usage). > > Best, > Jingsong Lee > > On Mon, Jan 6, 2020 at 4:41 PM Roman Khachatryan <[hidden email]> > wrote: > >> Hi everyone, >> >> I'm currently working on exactly-once JDBC sink implementation for Flink. >> Any ideas and/or feedback are welcome. >> >> I've considered the following options: >> 1. Two-phase commit. This is similar to Kafka sink. >> XA or database-specific API can be used. In case of XA, each sink subtask >> acts as a transaction manager, and each checkpoint-subtask pair corresponds >> to an XA transaction (with a single branch) >> 2. Write-ahead log. This is similar to Cassandra sink. >> Transactions metadata needs to be stored in the database along with data to >> avoid adding duplicates after recovery. >> >> For some scenarios, WAL might be better, but in general, XA seems to be a >> better option. >> >> ================== >> XA vs WAL comparison >> ================== >> >> 1. Consistency: XA preferable >> WAL: longer inconsistency windows when writing from several sink subtasks >> >> 2. Performance and efficiency: XA preferable (depends on the use case) >> XA: >> - long-running transactions may delay GC and may hold locks (depends on the >> use case) >> - databases/drivers may have XA implementation issues >> WAL: >> - double (de)serialization and IO (first to flink state, then to database) >> - read-from-state and write-to-database spikes on checkpoint completion >> both may have read spikes in consumer >> >> 3. Database support: XA preferable >> XA: most popular RDBMS do support it (at least mysql, pgsql, mssql, oracle, >> db2, sybase) >> WAL: meta table DDL may differ >> >> 4. Operability: depends on the use case >> XA: >> - increased undo segment (db may need to maintain a view from the >> transaction start) >> - abandoned transactions cleanup (abandoned tx may cause starvation if for >> example database blocks inserts of duplicates in different transactions) >> - (jars aren't an issue - most drivers ship XA implementation) >> WAL: >> - increased intermediate flink state >> - need to maintain meta table >> >> 5. Simplicity: about the same >> XA: more corner cases >> WAL: state and meta table management >> Both wrap writes into transactions >> >> 6. Testing - WAL preferable >> XA requires MVVC and proper XA support (no jars needed for derby) >> >> -- >> Regards, >> Roman >> > > > -- > Best, Jingsong Lee |
Hi Piotr,
We already have "JDBCUpsertOutputFormat". It is mainly proposed for Table/SQL API, but DataStream can use it to have the semantics of upsert. But it has weaker semantics than exactly once, and can only guarantee the final consistency. Best, Jingsong Lee On Mon, Jan 6, 2020 at 8:07 PM Piotr Nowojski <[hidden email]> wrote: > Hi, > > Also +1 for using the XA. There might be scenarios where WAL could be a > better option, but I think XA should be the default/first choice. If there > will be a bigger demand for WAL, we can always provide it as an alternative. > > As Jingsong mentioned, with WAL I would be worried about batch like > workload. Spamming external database with all of the accumulated records > once per checkpoint can easily leads to DDoS scenarios, external system > crashes. > > Jingsong do we already have an upsert JDBC sink? I guess it's just in > Table API, right? > > Piotrek > > > On 6 Jan 2020, at 10:43, Jingsong Li <[hidden email]> wrote: > > > > Thanks Roman for driving this. > > > > Although Upsert sink can bring some consistency guarantee to JDBC sink, > > users have strong requirements to append exactly-once sink too. > > > > +1 to use XA distribution transaction. > > > > For WAL, as far as my experience is concerned, writing JDBC in large > > quantities when checkpoint is completed will easily lead to too much > > pressure and some instability to database. > > > > For testing, maybe we need add some XA testing in e2e. > > > > But I'm worried about the performance of XA. Sometimes using MySQL's XA > > will lead to poor performance (of course, it may be related to usage). > > > > Best, > > Jingsong Lee > > > > On Mon, Jan 6, 2020 at 4:41 PM Roman Khachatryan < > [hidden email]> > > wrote: > > > >> Hi everyone, > >> > >> I'm currently working on exactly-once JDBC sink implementation for > Flink. > >> Any ideas and/or feedback are welcome. > >> > >> I've considered the following options: > >> 1. Two-phase commit. This is similar to Kafka sink. > >> XA or database-specific API can be used. In case of XA, each sink > subtask > >> acts as a transaction manager, and each checkpoint-subtask pair > corresponds > >> to an XA transaction (with a single branch) > >> 2. Write-ahead log. This is similar to Cassandra sink. > >> Transactions metadata needs to be stored in the database along with > data to > >> avoid adding duplicates after recovery. > >> > >> For some scenarios, WAL might be better, but in general, XA seems to be > a > >> better option. > >> > >> ================== > >> XA vs WAL comparison > >> ================== > >> > >> 1. Consistency: XA preferable > >> WAL: longer inconsistency windows when writing from several sink > subtasks > >> > >> 2. Performance and efficiency: XA preferable (depends on the use case) > >> XA: > >> - long-running transactions may delay GC and may hold locks (depends on > the > >> use case) > >> - databases/drivers may have XA implementation issues > >> WAL: > >> - double (de)serialization and IO (first to flink state, then to > database) > >> - read-from-state and write-to-database spikes on checkpoint completion > >> both may have read spikes in consumer > >> > >> 3. Database support: XA preferable > >> XA: most popular RDBMS do support it (at least mysql, pgsql, mssql, > oracle, > >> db2, sybase) > >> WAL: meta table DDL may differ > >> > >> 4. Operability: depends on the use case > >> XA: > >> - increased undo segment (db may need to maintain a view from the > >> transaction start) > >> - abandoned transactions cleanup (abandoned tx may cause starvation if > for > >> example database blocks inserts of duplicates in different transactions) > >> - (jars aren't an issue - most drivers ship XA implementation) > >> WAL: > >> - increased intermediate flink state > >> - need to maintain meta table > >> > >> 5. Simplicity: about the same > >> XA: more corner cases > >> WAL: state and meta table management > >> Both wrap writes into transactions > >> > >> 6. Testing - WAL preferable > >> XA requires MVVC and proper XA support (no jars needed for derby) > >> > >> -- > >> Regards, > >> Roman > >> > > > > > > -- > > Best, Jingsong Lee > > -- Best, Jingsong Lee |
Free forum by Nabble | Edit this page |