[jira] [Created] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-22874) flink table partition trigger doesn't effect as expectation when sink into hive table

Shang Yuanchun (Jira)
Spongebob created FLINK-22874:
---------------------------------

             Summary: flink table partition trigger doesn't effect as expectation when sink into hive table
                 Key: FLINK-22874
                 URL: https://issues.apache.org/jira/browse/FLINK-22874
             Project: Flink
          Issue Type: Bug
          Components: Table SQL / API
    Affects Versions: 1.13.1
            Reporter: Spongebob


I am trying to sink into hive partitioned table which partition commit trigger is declared as "

partition-time", and I had assigned watermark on the dataStream. When I input some data into dataStream it can not commit hive partition on time. Here's my code
{code:java}
//ddl of hive table
create table test_table(username string)
partitioned by (ts bigint)
stored as orc
TBLPROPERTIES (
  'sink.partition-commit.trigger'='partition-time',
  'sink.partition-commit.policy.kind'='metastore,success-file'
);{code}
{code:java}
// flink application code

val streamEnv = ...
val dataStream:DataStream[(String, Long)] = ...


// assign watermark and output watermark info in processFunction
class MyProcessFunction extends ProcessFunction[(String, Long), (String, Long, Long)] {
  override def processElement(value: (String, Long), ctx: ProcessFunction[(String, Long), (String, Long, Long)]#Context, out: Collector[(String, Long, Long)]): Unit = {
    out.collect((value._1, value._2, ctx.timerService().currentWatermark()))
  }
}

val resultStream = dataStream
.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ZERO)
  .withTimestampAssigner(new SerializableTimestampAssigner[(String, Long)] {
    override def extractTimestamp(element: (String, Long), recordTimestamp: Long): Long = {
      element._2 * 1000
    }
  }))
.process(new MyProcessFunction)

//
val streamTableEnv = buildStreamTableEnv(streamEnv, EnvironmentSettings.newInstance().inStreamingMode().useBlinkPlanner().build())

// convert dataStream into hive catalog table and sink into hive
streamTableEnv.createTemporaryView("test_catalog_t", resultStream)
val catalog = ...
streamTableEnv.registerCatalog("hive", catalog)
streamTableEnv.useCatalog("hive")
streamTableEnv.executeSql("insert into test_table select _1,_2 from default_catalog.default_database.test_catalog_t").print()


// flink use the default parallelism 4
// input data
(a, 1)
(b, 2)
(c, 3)
(d, 4)
(a, 5)
 ...

// result
there are much partition directories on hdfs but all they are inprogressing files and never would be commit to hive metastore.{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)