[jira] [Created] (FLINK-15728) JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStaement

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-15728) JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStaement

Shang Yuanchun (Jira)
Bhagavan created FLINK-15728:
--------------------------------

             Summary: JDBCUpsertOutputFormat does not set bind parameter keyFields in updateStaement
                 Key: FLINK-15728
                 URL: https://issues.apache.org/jira/browse/FLINK-15728
             Project: Flink
          Issue Type: Bug
          Components: Connectors / JDBC
    Affects Versions: 1.9.1
            Reporter: Bhagavan


When using JDBCUpsertOutputFormat custom dialect e.g. H2/Oracle which uses UpsertWriterUsingInsertUpdateStatement, code fails with below error.
{code:java}
Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set [90012-200]Caused by: org.h2.jdbc.JdbcSQLDataException: Parameter "#6" is not set [90012-200] at org.h2.message.DbException.getJdbcSQLException(DbException.java:590) at org.h2.message.DbException.getJdbcSQLException(DbException.java:429) at org.h2.message.DbException.get(DbException.java:205) at org.h2.message.DbException.get(DbException.java:181) at org.h2.expression.Parameter.checkSet(Parameter.java:83) at org.h2.jdbc.JdbcPreparedStatement.addBatch(JdbcPreparedStatement.java:1275) at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter$UpsertWriterUsingInsertUpdateStatement.processOneRowInBatch(UpsertWriter.java:233) at org.apache.flink.api.java.io.jdbc.writer.UpsertWriter.executeBatch(UpsertWriter.java:111) {code}
This is due UpsertWriterUsingInsertUpdateStatement#processOneRowInBatch
 does not set all bind paramters in case of Update.

This bug does get surfaced while using Derby DB.
 In JDBCUpsertOutputFormatTest if we replace Derby with H2 we can reproduce the bug.

The fix is trivial. Happy to raise PR.
{code:java}
//for update case replace below
setRecordToStatement(updateStatement, fieldTypes, row);
//with
setRecordToStatement(updateStatement, fieldTypes + pkTypes, row  + pkRow);
//NOTE:  as prepared updateStatement contains addition where clause we need pass additional bind values and its sql Types



{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)