Spongebob created FLINK-22874:
---------------------------------
Summary: flink table partition trigger doesn't effect as expectation when sink into hive table
Key: FLINK-22874
URL:
https://issues.apache.org/jira/browse/FLINK-22874 Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.13.1
Reporter: Spongebob
I am trying to sink into hive partitioned table which partition commit trigger is declared as "
partition-time", and I had assigned watermark on the dataStream. When I input some data into dataStream it can not commit hive partition on time. Here's my code
{code:java}
//ddl of hive table
create table test_table(username string)
partitioned by (ts bigint)
stored as orc
TBLPROPERTIES (
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.policy.kind'='metastore,success-file'
);{code}
{code:java}
// flink application code
val streamEnv = ...
val dataStream:DataStream[(String, Long)] = ...
// assign watermark and output watermark info in processFunction
class MyProcessFunction extends ProcessFunction[(String, Long), (String, Long, Long)] {
override def processElement(value: (String, Long), ctx: ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: Collector[(String, Long, Long)]): Unit = {
out.collect((value._1, value._2, ctx.timerService().currentWatermark()))
}
}
val resultStream = dataStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
.withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
element._2 * 1000
}
}))
.process(new MyProcessFunction)
//
val streamTableEnv = buildStreamTableEnv(streamEnv, EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())
// convert dataStream into hive catalog table and sink into hive
streamTableEnv.createTemporaryView("test_catalog_t", resultStream)
val catalog = ...
streamTableEnv.registerCatalog("hive", catalog)
streamTableEnv.useCatalog("hive")
streamTableEnv.executeSql("insert into test_table select _1,_2 from default_catalog.default_database.test_catalog_t").print()
// flink use the default parallelism 4
// input data
(a, 1)
(b, 2)
(c, 3)
(d, 4)
(a, 5)
...
// result
there are much partition directories on hdfs but all they are inprogressing files and never would be commit to hive metastore.{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)