mzz created FLINK-18652:
---------------------------
Summary: JDBCAppendTableSink to ClickHouse (data always repeating)
Key: FLINK-18652
URL:
https://issues.apache.org/jira/browse/FLINK-18652 Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.10.0
Reporter: mzz
Hi all,
data stream is : kafka->flinkSQL->clickhouse。
The window is 15 min,but,15 minutes after the first time, the data kepping repeat sink to ClickHouse, plz help me ,thx。
{code:java}
*// data source from kafka
* streamTableEnvironment.sqlUpdate(createTableSql)
LOG.info("kafka source table has created !")
val groupTable = streamTableEnvironment.sqlQuery(tempSql)
streamTableEnvironment.createTemporaryView("aggs_temp_table", groupTable)
*// this is window sql ,use ProcessingTime
* val re_table = streamTableEnvironment.sqlQuery(windowSql)
re_table.printSchema()
// groupTable.printSchema()
val rr = streamTableEnvironment.toAppendStream[Result](re_table)
* // The data here is printed normally
* rr.print()
streamTableEnvironment.createTemporaryView("result_table", rr)
val s = streamTableEnvironment.sqlQuery(sql)
*// sink to clickhouse*
val sink: JDBCAppendTableSink = JDBCAppendTableSink.builder()
.setDrivername("ru.yandex.clickhouse.ClickHouseDriver")
.setDBUrl(URL)
.setQuery(insertCKSql)
.setUsername(USERNAME)
.setPassword(PASSWORD)
.setBatchSize(10000)
.setParameterTypes(
Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, Types.STRING,
Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, Types.FLOAT,
Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()
)
.build()
streamTableEnvironment.registerTableSink("ckResult", Array[String]("data_date", "point", "platform", "page_name", "component_name", "booth_name", "position1", "advertiser",
"adv_code", "request_num", "return_num", "fill_rate", "expose_num", "expose_rate", "click_num", "click_rate", "ecpm", "income", "created_at"),
Array[TypeInformation[_]](Types.LONG, Types.LONG, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.LONG, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.LONG, Types.FLOAT, Types.FLOAT, Types.FLOAT, Types.LONG()),
sink)
// insert into TableSink
s.insertInto("ckResult")
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)