How to register and send custom metrics from Flink?

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to register and send custom metrics from Flink?

Bowen Li
Hi guys,
    I'm retrying to send some app related custom metrics from Flink to
Datadog via StatsD.

    I followed https://ci.apache.org/projects/flink/flink-docs-
release-1.2/monitoring/metrics.html to set up flink-conf.yaml and test code
like this

    // flink-conf.yaml

metrics.reporters: stsd
metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
metrics.reporter.stsd.host: localhost
metrics.reporter.stsd.port: 8125

metrics.scope.jm: bowen.jobmanager
metrics.scope.jm.job: bowen.jobmanager.bowen.job_name
metrics.scope.tm: bowen.taskmanager.<tm_id>

metrics.scope.tm.job: bowen.taskmanager.<tm_id>.<job_name>
metrics.scope.task: bowen.taskmanager.<tm_id>.<job_name>.<task_name>.<
subtask_index>
metrics.scope.operator: bowen.taskmanager.<tm_id>.<
job_name>.<operator_name>.<subtask_index>


      // test code, by modifying WordCount example


public static final class Tokenizer extends
RichFlatMapFunction<String, Tuple2<String, Integer>> {
   private static final long serialVersionUID = 1L;

   private Counter counter;

   @Override
   public void open(Configuration config) {
      getRuntimeContext()
         .getMetricGroup()
         .addGroup("bowen.test")
         .gauge("bowen.test.flink", new Gauge<Integer>() {
            @Override
            public Integer getValue() {
               return 100;
            }
         });  // test custom metrics

      counter = getRuntimeContext()
         .getMetricGroup()
         .addGroup("bowen.test")
         .counter("bowen.flink.metric"); // test custom metrics
      counter.inc(100);
   }

   @Override
   public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
         throws Exception {
      // normalize and split the line
      String[] tokens = value.toLowerCase().split("\\W+");

      // emit the pairs
      for (String token : tokens) {
         if (token.length() > 0) {
            out.collect(new Tuple2<String, Integer>(token, 1));
         }
      }

      counter.inc(100);
   }
}

      I found my Datadog received all system scope metrics, but non of my
custom metric. I researched all night but gained no progress. What did I do
wrong? Flink is able to handle custom metrics right? I'd really appreciate
some guidance on sending custom metrics!

     Thank you very much!
Bowen
Reply | Threaded
Open this post in threaded view
|

Re: How to register and send custom metrics from Flink?

Chesnay Schepler-3
Hello,

yes, Flink can handle custom metrics. It is quite odd that you can see
the system metrics but not your
own; I don't see a problem with your code.

How long is the job running? Since you create a metric for the flatMap
operation the metric will only
be exposed as long as the flatMap operation is active. Thus, if this
operation takes less than 10 seconds
(the default interval for all reporters) then it may never report it.

Regards,
Chesnay

On 13.03.2017 08:48, Bowen Li wrote:

> Hi guys,
>      I'm retrying to send some app related custom metrics from Flink to
> Datadog via StatsD.
>
>      I followed https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/monitoring/metrics.html to set up flink-conf.yaml and test code
> like this
>
>      // flink-conf.yaml
>
> metrics.reporters: stsd
> metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter
> metrics.reporter.stsd.host: localhost
> metrics.reporter.stsd.port: 8125
>
> metrics.scope.jm: bowen.jobmanager
> metrics.scope.jm.job: bowen.jobmanager.bowen.job_name
> metrics.scope.tm: bowen.taskmanager.<tm_id>
>
> metrics.scope.tm.job: bowen.taskmanager.<tm_id>.<job_name>
> metrics.scope.task: bowen.taskmanager.<tm_id>.<job_name>.<task_name>.<
> subtask_index>
> metrics.scope.operator: bowen.taskmanager.<tm_id>.<
> job_name>.<operator_name>.<subtask_index>
>
>
>        // test code, by modifying WordCount example
>
>
> public static final class Tokenizer extends
> RichFlatMapFunction<String, Tuple2<String, Integer>> {
>     private static final long serialVersionUID = 1L;
>
>     private Counter counter;
>
>     @Override
>     public void open(Configuration config) {
>        getRuntimeContext()
>           .getMetricGroup()
>           .addGroup("bowen.test")
>           .gauge("bowen.test.flink", new Gauge<Integer>() {
>              @Override
>              public Integer getValue() {
>                 return 100;
>              }
>           });  // test custom metrics
>
>        counter = getRuntimeContext()
>           .getMetricGroup()
>           .addGroup("bowen.test")
>           .counter("bowen.flink.metric"); // test custom metrics
>        counter.inc(100);
>     }
>
>     @Override
>     public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
>           throws Exception {
>        // normalize and split the line
>        String[] tokens = value.toLowerCase().split("\\W+");
>
>        // emit the pairs
>        for (String token : tokens) {
>           if (token.length() > 0) {
>              out.collect(new Tuple2<String, Integer>(token, 1));
>           }
>        }
>
>        counter.inc(100);
>     }
> }
>
>        I found my Datadog received all system scope metrics, but non of my
> custom metric. I researched all night but gained no progress. What did I do
> wrong? Flink is able to handle custom metrics right? I'd really appreciate
> some guidance on sending custom metrics!
>
>       Thank you very much!
> Bowen
>

Reply | Threaded
Open this post in threaded view
|

Re: How to register and send custom metrics from Flink?

Bowen Li
Hi Chesnay,
    You saved my day! Yes, the WordCount example
<https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java>
runs
too fast, about 20ms. I extended its running time, and also set
'metrics.reporter.stsd.interval: 1'. Guess what?! Metrics show up in
Datadog!

    I will open a PR to add this tip to Metrics page
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html>
to
help other newbies ramp up faster.

    Thank you!
Bowen


On Mon, Mar 13, 2017 at 1:48 AM, Chesnay Schepler <[hidden email]>
wrote:

> Hello,
>
> yes, Flink can handle custom metrics. It is quite odd that you can see the
> system metrics but not your
> own; I don't see a problem with your code.
>
> How long is the job running? Since you create a metric for the flatMap
> operation the metric will only
> be exposed as long as the flatMap operation is active. Thus, if this
> operation takes less than 10 seconds
> (the default interval for all reporters) then it may never report it.
>
> Regards,
> Chesnay
>
> On 13.03.2017 08:48, Bowen Li wrote:
>
>> Hi guys,
>>      I'm retrying to send some app related custom metrics from Flink to
>> Datadog via StatsD.
>>
>>      I followed https://ci.apache.org/projects/flink/flink-docs-
>> release-1.2/monitoring/metrics.html to set up flink-conf.yaml and test
>> code
>>
>> like this
>>
>>      // flink-conf.yaml
>>
>> metrics.reporters: stsd
>> metrics.reporter.stsd.class: org.apache.flink.metrics.stats
>> d.StatsDReporter
>> metrics.reporter.stsd.host: localhost
>> metrics.reporter.stsd.port: 8125
>>
>> metrics.scope.jm: bowen.jobmanager
>> metrics.scope.jm.job: bowen.jobmanager.bowen.job_name
>> metrics.scope.tm: bowen.taskmanager.<tm_id>
>>
>> metrics.scope.tm.job: bowen.taskmanager.<tm_id>.<job_name>
>> metrics.scope.task: bowen.taskmanager.<tm_id>.<job_name>.<task_name>.<
>> subtask_index>
>> metrics.scope.operator: bowen.taskmanager.<tm_id>.<
>> job_name>.<operator_name>.<subtask_index>
>>
>>
>>        // test code, by modifying WordCount example
>>
>>
>> public static final class Tokenizer extends
>> RichFlatMapFunction<String, Tuple2<String, Integer>> {
>>     private static final long serialVersionUID = 1L;
>>
>>     private Counter counter;
>>
>>     @Override
>>     public void open(Configuration config) {
>>        getRuntimeContext()
>>           .getMetricGroup()
>>           .addGroup("bowen.test")
>>           .gauge("bowen.test.flink", new Gauge<Integer>() {
>>              @Override
>>              public Integer getValue() {
>>                 return 100;
>>              }
>>           });  // test custom metrics
>>
>>        counter = getRuntimeContext()
>>           .getMetricGroup()
>>           .addGroup("bowen.test")
>>           .counter("bowen.flink.metric"); // test custom metrics
>>        counter.inc(100);
>>     }
>>
>>     @Override
>>     public void flatMap(String value, Collector<Tuple2<String, Integer>>
>> out)
>>           throws Exception {
>>        // normalize and split the line
>>        String[] tokens = value.toLowerCase().split("\\W+");
>>
>>        // emit the pairs
>>        for (String token : tokens) {
>>           if (token.length() > 0) {
>>              out.collect(new Tuple2<String, Integer>(token, 1));
>>           }
>>        }
>>
>>        counter.inc(100);
>>     }
>> }
>>
>>        I found my Datadog received all system scope metrics, but non of my
>> custom metric. I researched all night but gained no progress. What did I
>> do
>> wrong? Flink is able to handle custom metrics right? I'd really appreciate
>> some guidance on sending custom metrics!
>>
>>       Thank you very much!
>> Bowen
>>
>>
>