How to merge messages from all partitions

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to merge messages from all partitions

Naveen Tirupattur
HI,

I am trying to group messages by message name, timestamp and then perform aggregation on message value. My window function looks like below

metrics.keyBy("metricName")
    .keyBy("timeStamp")
    .timeWindow(Time.seconds(30))
    .trigger(ProcessingTimeTrigger.create())
    .fold(new Tuple3<String,Long,Double>("",0L,0.0), new FoldFunction<Metric, Tuple3<String,Long,Double>>() {

      @Override
      public Tuple3<String, Long, Double> fold(Tuple3<String, Long, Double> arg0, Metric arg1) throws Exception {
        double count = 0.0;
        long timeStamp = arg1.getTimeStamp();
        String metricName = arg1.getMetricName();
        count =+ arg1.getValue();
        return new Tuple3<String,Long,Double>(metricName,timeStamp,count);
      }
    }).print();

My intention is to aggregate all the messages of a particular type that occurred at a timestamp t across all partitions to calculate running mean. I see that some aggregation is happening but for each partition the intermediate values are being printed. My question is how do I get one aggregated value across all partitions? Kindly help.

P.S I am getting messages from Kafka with 10 partitions.

Thanks,
Naveen
Reply | Threaded
Open this post in threaded view
|

Re: How to merge messages from all partitions

Till Rohrmann
Hi Naveen,

you would have to apply an all reduce after you’ve aggregated the values by
key.

metrics...
 timeWindowAll(Time.seconds(30)).fold(..., ...);

However, be aware that this all window fold operation will be executed by a
single operator (parallelism 1). You could also do this all window
operation right away without applying the keyed window.

Cheers,
Till


On Sat, Dec 10, 2016 at 3:46 AM, Naveen Tirupattur <[hidden email]
> wrote:

> HI,
>
> I am trying to group messages by message name, timestamp and then perform
> aggregation on message value. My window function looks like below
>
> metrics.keyBy("metricName")
>     .keyBy("timeStamp")
>     .timeWindow(Time.seconds(30))
>     .trigger(ProcessingTimeTrigger.create())
>     .fold(new Tuple3<String,Long,Double>("",0L,0.0), new
> FoldFunction<Metric, Tuple3<String,Long,Double>>() {
>
>       @Override
>       public Tuple3<String, Long, Double> fold(Tuple3<String, Long,
> Double> arg0, Metric arg1) throws Exception {
>         double count = 0.0;
>         long timeStamp = arg1.getTimeStamp();
>         String metricName = arg1.getMetricName();
>         count =+ arg1.getValue();
>         return new Tuple3<String,Long,Double>(metricName,timeStamp,count);
>       }
>     }).print();
>
> My intention is to aggregate all the messages of a particular type that
> occurred at a timestamp t across all partitions to calculate running mean.
> I see that some aggregation is happening but for each partition the
> intermediate values are being printed. My question is how do I get one
> aggregated value across all partitions? Kindly help.
>
> P.S I am getting messages from Kafka with 10 partitions.
>
> Thanks,
> Naveen