[jira] [Created] (FLINK-17348) Expose metric group to ascendingTimestampExtractor

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-17348) Expose metric group to ascendingTimestampExtractor

Shang Yuanchun (Jira)
Theo Diefenthal created FLINK-17348:
---------------------------------------

             Summary: Expose metric group to ascendingTimestampExtractor
                 Key: FLINK-17348
                 URL: https://issues.apache.org/jira/browse/FLINK-17348
             Project: Flink
          Issue Type: Improvement
            Reporter: Theo Diefenthal


A common use case in Flink + kafka is that one has lots of kafka Partitions with each having ascending timestamps.

In my scenario, due to various operational reasons, we put log files from Filesystem to kafka, one server per partition, and then consume those in Flink.

Sometimes, it can happen that we collect the files in wrong order into kafka which leads to ascending timestamp problems. If that happens and we have the default logging violation handler enabled, we produce several gb of logs in a very short amount of time, which we would like to circumvent. 

What we really want : track the number of violations in a metric and define an alarm on that in our monitoring dashboard.

Currently, there is sadly no way to reference the metric group from the ascending timestamp extractor. I wish, there could be something similar like the open method on other rich functions. 

My current workaround is to add a custom map task post to the source. For that task I need to pass on the kafka partition from the source, which I usually don't care about and I need to keep track of each partitions current timestamp manually, exactly the same way as the extractor does. - > workaround with "polluting" my pipeline quite a bit just for a single metric. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)