Hi guys,
I'm retrying to send some app related custom metrics from Flink to Datadog via StatsD. I followed https://ci.apache.org/projects/flink/flink-docs- release-1.2/monitoring/metrics.html to set up flink-conf.yaml and test code like this // flink-conf.yaml metrics.reporters: stsd metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter metrics.reporter.stsd.host: localhost metrics.reporter.stsd.port: 8125 metrics.scope.jm: bowen.jobmanager metrics.scope.jm.job: bowen.jobmanager.bowen.job_name metrics.scope.tm: bowen.taskmanager.<tm_id> metrics.scope.tm.job: bowen.taskmanager.<tm_id>.<job_name> metrics.scope.task: bowen.taskmanager.<tm_id>.<job_name>.<task_name>.< subtask_index> metrics.scope.operator: bowen.taskmanager.<tm_id>.< job_name>.<operator_name>.<subtask_index> // test code, by modifying WordCount example public static final class Tokenizer extends RichFlatMapFunction<String, Tuple2<String, Integer>> { private static final long serialVersionUID = 1L; private Counter counter; @Override public void open(Configuration config) { getRuntimeContext() .getMetricGroup() .addGroup("bowen.test") .gauge("bowen.test.flink", new Gauge<Integer>() { @Override public Integer getValue() { return 100; } }); // test custom metrics counter = getRuntimeContext() .getMetricGroup() .addGroup("bowen.test") .counter("bowen.flink.metric"); // test custom metrics counter.inc(100); } @Override public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception { // normalize and split the line String[] tokens = value.toLowerCase().split("\\W+"); // emit the pairs for (String token : tokens) { if (token.length() > 0) { out.collect(new Tuple2<String, Integer>(token, 1)); } } counter.inc(100); } } I found my Datadog received all system scope metrics, but non of my custom metric. I researched all night but gained no progress. What did I do wrong? Flink is able to handle custom metrics right? I'd really appreciate some guidance on sending custom metrics! Thank you very much! Bowen |
Hello,
yes, Flink can handle custom metrics. It is quite odd that you can see the system metrics but not your own; I don't see a problem with your code. How long is the job running? Since you create a metric for the flatMap operation the metric will only be exposed as long as the flatMap operation is active. Thus, if this operation takes less than 10 seconds (the default interval for all reporters) then it may never report it. Regards, Chesnay On 13.03.2017 08:48, Bowen Li wrote: > Hi guys, > I'm retrying to send some app related custom metrics from Flink to > Datadog via StatsD. > > I followed https://ci.apache.org/projects/flink/flink-docs- > release-1.2/monitoring/metrics.html to set up flink-conf.yaml and test code > like this > > // flink-conf.yaml > > metrics.reporters: stsd > metrics.reporter.stsd.class: org.apache.flink.metrics.statsd.StatsDReporter > metrics.reporter.stsd.host: localhost > metrics.reporter.stsd.port: 8125 > > metrics.scope.jm: bowen.jobmanager > metrics.scope.jm.job: bowen.jobmanager.bowen.job_name > metrics.scope.tm: bowen.taskmanager.<tm_id> > > metrics.scope.tm.job: bowen.taskmanager.<tm_id>.<job_name> > metrics.scope.task: bowen.taskmanager.<tm_id>.<job_name>.<task_name>.< > subtask_index> > metrics.scope.operator: bowen.taskmanager.<tm_id>.< > job_name>.<operator_name>.<subtask_index> > > > // test code, by modifying WordCount example > > > public static final class Tokenizer extends > RichFlatMapFunction<String, Tuple2<String, Integer>> { > private static final long serialVersionUID = 1L; > > private Counter counter; > > @Override > public void open(Configuration config) { > getRuntimeContext() > .getMetricGroup() > .addGroup("bowen.test") > .gauge("bowen.test.flink", new Gauge<Integer>() { > @Override > public Integer getValue() { > return 100; > } > }); // test custom metrics > > counter = getRuntimeContext() > .getMetricGroup() > .addGroup("bowen.test") > .counter("bowen.flink.metric"); // test custom metrics > counter.inc(100); > } > > @Override > public void flatMap(String value, Collector<Tuple2<String, Integer>> out) > throws Exception { > // normalize and split the line > String[] tokens = value.toLowerCase().split("\\W+"); > > // emit the pairs > for (String token : tokens) { > if (token.length() > 0) { > out.collect(new Tuple2<String, Integer>(token, 1)); > } > } > > counter.inc(100); > } > } > > I found my Datadog received all system scope metrics, but non of my > custom metric. I researched all night but gained no progress. What did I do > wrong? Flink is able to handle custom metrics right? I'd really appreciate > some guidance on sending custom metrics! > > Thank you very much! > Bowen > |
Hi Chesnay,
You saved my day! Yes, the WordCount example <https://github.com/apache/flink/blob/master/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCount.java> runs too fast, about 20ms. I extended its running time, and also set 'metrics.reporter.stsd.interval: 1'. Guess what?! Metrics show up in Datadog! I will open a PR to add this tip to Metrics page <https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html> to help other newbies ramp up faster. Thank you! Bowen On Mon, Mar 13, 2017 at 1:48 AM, Chesnay Schepler <[hidden email]> wrote: > Hello, > > yes, Flink can handle custom metrics. It is quite odd that you can see the > system metrics but not your > own; I don't see a problem with your code. > > How long is the job running? Since you create a metric for the flatMap > operation the metric will only > be exposed as long as the flatMap operation is active. Thus, if this > operation takes less than 10 seconds > (the default interval for all reporters) then it may never report it. > > Regards, > Chesnay > > On 13.03.2017 08:48, Bowen Li wrote: > >> Hi guys, >> I'm retrying to send some app related custom metrics from Flink to >> Datadog via StatsD. >> >> I followed https://ci.apache.org/projects/flink/flink-docs- >> release-1.2/monitoring/metrics.html to set up flink-conf.yaml and test >> code >> >> like this >> >> // flink-conf.yaml >> >> metrics.reporters: stsd >> metrics.reporter.stsd.class: org.apache.flink.metrics.stats >> d.StatsDReporter >> metrics.reporter.stsd.host: localhost >> metrics.reporter.stsd.port: 8125 >> >> metrics.scope.jm: bowen.jobmanager >> metrics.scope.jm.job: bowen.jobmanager.bowen.job_name >> metrics.scope.tm: bowen.taskmanager.<tm_id> >> >> metrics.scope.tm.job: bowen.taskmanager.<tm_id>.<job_name> >> metrics.scope.task: bowen.taskmanager.<tm_id>.<job_name>.<task_name>.< >> subtask_index> >> metrics.scope.operator: bowen.taskmanager.<tm_id>.< >> job_name>.<operator_name>.<subtask_index> >> >> >> // test code, by modifying WordCount example >> >> >> public static final class Tokenizer extends >> RichFlatMapFunction<String, Tuple2<String, Integer>> { >> private static final long serialVersionUID = 1L; >> >> private Counter counter; >> >> @Override >> public void open(Configuration config) { >> getRuntimeContext() >> .getMetricGroup() >> .addGroup("bowen.test") >> .gauge("bowen.test.flink", new Gauge<Integer>() { >> @Override >> public Integer getValue() { >> return 100; >> } >> }); // test custom metrics >> >> counter = getRuntimeContext() >> .getMetricGroup() >> .addGroup("bowen.test") >> .counter("bowen.flink.metric"); // test custom metrics >> counter.inc(100); >> } >> >> @Override >> public void flatMap(String value, Collector<Tuple2<String, Integer>> >> out) >> throws Exception { >> // normalize and split the line >> String[] tokens = value.toLowerCase().split("\\W+"); >> >> // emit the pairs >> for (String token : tokens) { >> if (token.length() > 0) { >> out.collect(new Tuple2<String, Integer>(token, 1)); >> } >> } >> >> counter.inc(100); >> } >> } >> >> I found my Datadog received all system scope metrics, but non of my >> custom metric. I researched all night but gained no progress. What did I >> do >> wrong? Flink is able to handle custom metrics right? I'd really appreciate >> some guidance on sending custom metrics! >> >> Thank you very much! >> Bowen >> >> > |
Free forum by Nabble | Edit this page |