Hi,
We are facing a serious production issue with Flink. Any help would be appreciated. We receive packets from a Kafka Cluster - This cluster has a sudden drop in the packets from 22:00 UTC till 00:30 UTC everyday [on a specific topic, say "topic A"]. Though our job reads from a different topic [say "topic B"], we see that we drop a lot of packets here [due to "laterecordsDropped" metric]. At the same time, we see the job which reads from "topic A" has high fetch rate. We also observed one of the brokers of this cluster had an abnormal CPU rise [which i attributed to the high fetch rates]. We have a tumbling window of 1 min [with 10 seconds of watermarksPeriodicBounded]. This is based on the packets' event time. Is there any reason why my job reading from "topic B" can higher records dropped. The picture below has a screenshot where Laterecords dropped corresponds to job reading from "topic B" Fetch and Consume rates relates to job reading from "topic A" [which has the downward trend in traffic in the mentioned times]. All these graphs are correlated and we are unable to isolate this problem. there are other modules which consumes from this topic, and we have no slow records logged here, which is why we are not sure of there is this issue with Flink alone. Thanks. |
Hi Ramya,
I am a little confused in the situation here. Is the following what you saw: 1. The Kafka topic A has a traffic drop from 22:00 UTC to 00:30 UTC. 2. Your Flink job (say job 1) reads from Topic A, but it has a high fetch rate with an abnormally high CPU consumption during the period mentioned in (1). 3. Your Flink job (say job 2) reads from Topic B, and it sees a lot of messages got dropped due to late arrivals. i.e. the timestamp of those messages is larger than the watermark + max-allowed-lateness. What is the relationship between job 1 and job 2? Are they the same job? Is there any producer sending "old" messages to the Kafka cluster which may cause those messages to be dropped by Flink due to their old timestamp? Unfortunately the image does not work in apache mailing list. Can you post the image somewhere and send the link instead? Thanks, Jiangjie (Becket) Qin On Thu, Jul 18, 2019 at 9:36 AM Ramya Ramamurthy <[hidden email]> wrote: > Hi, > > We are facing a serious production issue with Flink. Any help would be > appreciated. > > We receive packets from a Kafka Cluster - This cluster has a sudden drop > in the packets from 22:00 UTC till 00:30 UTC everyday [on a specific topic, > say "topic A"]. Though our job reads from a different topic [say "topic > B"], we see that we drop a lot of packets here [due to "laterecordsDropped" > metric]. At the same time, we see the job which reads from "topic A" has > high fetch rate. We also observed one of the brokers of this cluster had an > abnormal CPU rise [which i attributed to the high fetch rates]. > > We have a tumbling window of 1 min [with 10 seconds of > watermarksPeriodicBounded]. This is based on the packets' event time. Is > there any reason why my job reading from "topic B" can higher records > dropped. > > The picture below has a screenshot where > Laterecords dropped corresponds to job reading from "topic B" > Fetch and Consume rates relates to job reading from "topic A" [which has > the downward trend in traffic in the mentioned times]. > > [image: image.png] > > All these graphs are correlated and we are unable to isolate this problem. > there are other modules which consumes from this topic, and we have no slow > records logged here, which is why we are not sure of there is this issue > with Flink alone. > > Thanks. > |
Hi Jiangjie,
Thanks for your response. I was able to figure out the issue. We have multiple end points from which we receive data. Out of which, one of the endpoints NTP was not set up/rather not getting synced to ntp properly. so those VM's were sending packets which was 2 mins ahead of time. So all the rest of the packets from various other sources coming into that Kafka cluster was dropped. Increased the watermark to 3 mins [from 1 min] and this stopped the "laterecordsdropped". But this is kind of worrisome, if there is anything like this, it would affect our systems badly, as we cannot afford to lose data. Is there any better way to approach this? The Flink tables do not have side outputs to collect these lost packets as well, which is also a concern. Is this a feature in making ?? I could see that the Flink Tables arent yet evolved like Streams. Let me know what you think. Regards, ~Ramya. On Mon, Jul 29, 2019 at 6:17 PM Becket Qin <[hidden email]> wrote: > Hi Ramya, > > I am a little confused in the situation here. Is the following what you > saw: > > 1. The Kafka topic A has a traffic drop from 22:00 UTC to 00:30 UTC. > 2. Your Flink job (say job 1) reads from Topic A, but it has a high fetch > rate with an abnormally high CPU consumption during the period mentioned in > (1). > 3. Your Flink job (say job 2) reads from Topic B, and it sees a lot of > messages got dropped due to late arrivals. i.e. the timestamp of those > messages is larger than the watermark + max-allowed-lateness. > > What is the relationship between job 1 and job 2? Are they the same job? Is > there any producer sending "old" messages to the Kafka cluster which may > cause those messages to be dropped by Flink due to their old timestamp? > > Unfortunately the image does not work in apache mailing list. Can you post > the image somewhere and send the link instead? > > Thanks, > > Jiangjie (Becket) Qin > > > > On Thu, Jul 18, 2019 at 9:36 AM Ramya Ramamurthy <[hidden email]> > wrote: > > > Hi, > > > > We are facing a serious production issue with Flink. Any help would be > > appreciated. > > > > We receive packets from a Kafka Cluster - This cluster has a sudden drop > > in the packets from 22:00 UTC till 00:30 UTC everyday [on a specific > topic, > > say "topic A"]. Though our job reads from a different topic [say "topic > > B"], we see that we drop a lot of packets here [due to > "laterecordsDropped" > > metric]. At the same time, we see the job which reads from "topic A" has > > high fetch rate. We also observed one of the brokers of this cluster had > an > > abnormal CPU rise [which i attributed to the high fetch rates]. > > > > We have a tumbling window of 1 min [with 10 seconds of > > watermarksPeriodicBounded]. This is based on the packets' event time. Is > > there any reason why my job reading from "topic B" can higher records > > dropped. > > > > The picture below has a screenshot where > > Laterecords dropped corresponds to job reading from "topic B" > > Fetch and Consume rates relates to job reading from "topic A" [which has > > the downward trend in traffic in the mentioned times]. > > > > [image: image.png] > > > > All these graphs are correlated and we are unable to isolate this > problem. > > there are other modules which consumes from this topic, and we have no > slow > > records logged here, which is why we are not sure of there is this issue > > with Flink alone. > > > > Thanks. > > > |
Is the watermarking configured per-partition in Kafka, or per source?
If it is configured per partition, then a late (trailing) or early (leading) partition would not affect the watermark as a whole. There would not be any dropping of late data, simply a delay in the results until the latest partition (watermark wise) has caught up. Best, Stephan On Wed, Jul 31, 2019 at 8:00 AM Ramya Ramamurthy <[hidden email]> wrote: > Hi Jiangjie, > > Thanks for your response. I was able to figure out the issue. > We have multiple end points from which we receive data. Out of which, one > of the endpoints NTP was not set up/rather not getting synced to ntp > properly. so those VM's were sending packets which was 2 mins ahead of > time. So all the rest of the packets from various other sources coming into > that Kafka cluster was dropped. Increased the watermark to 3 mins [from 1 > min] and this stopped the "laterecordsdropped". > > But this is kind of worrisome, if there is anything like this, it would > affect our systems badly, as we cannot afford to lose data. Is there any > better way to approach this? The Flink tables do not have side outputs to > collect these lost packets as well, which is also a concern. Is this a > feature in making ?? > I could see that the Flink Tables arent yet evolved like Streams. Let me > know what you think. > > Regards, > ~Ramya. > > On Mon, Jul 29, 2019 at 6:17 PM Becket Qin <[hidden email]> wrote: > > > Hi Ramya, > > > > I am a little confused in the situation here. Is the following what you > > saw: > > > > 1. The Kafka topic A has a traffic drop from 22:00 UTC to 00:30 UTC. > > 2. Your Flink job (say job 1) reads from Topic A, but it has a high fetch > > rate with an abnormally high CPU consumption during the period mentioned > in > > (1). > > 3. Your Flink job (say job 2) reads from Topic B, and it sees a lot of > > messages got dropped due to late arrivals. i.e. the timestamp of those > > messages is larger than the watermark + max-allowed-lateness. > > > > What is the relationship between job 1 and job 2? Are they the same job? > Is > > there any producer sending "old" messages to the Kafka cluster which may > > cause those messages to be dropped by Flink due to their old timestamp? > > > > Unfortunately the image does not work in apache mailing list. Can you > post > > the image somewhere and send the link instead? > > > > Thanks, > > > > Jiangjie (Becket) Qin > > > > > > > > On Thu, Jul 18, 2019 at 9:36 AM Ramya Ramamurthy <[hidden email]> > > wrote: > > > > > Hi, > > > > > > We are facing a serious production issue with Flink. Any help would be > > > appreciated. > > > > > > We receive packets from a Kafka Cluster - This cluster has a sudden > drop > > > in the packets from 22:00 UTC till 00:30 UTC everyday [on a specific > > topic, > > > say "topic A"]. Though our job reads from a different topic [say "topic > > > B"], we see that we drop a lot of packets here [due to > > "laterecordsDropped" > > > metric]. At the same time, we see the job which reads from "topic A" > has > > > high fetch rate. We also observed one of the brokers of this cluster > had > > an > > > abnormal CPU rise [which i attributed to the high fetch rates]. > > > > > > We have a tumbling window of 1 min [with 10 seconds of > > > watermarksPeriodicBounded]. This is based on the packets' event time. > Is > > > there any reason why my job reading from "topic B" can higher records > > > dropped. > > > > > > The picture below has a screenshot where > > > Laterecords dropped corresponds to job reading from "topic B" > > > Fetch and Consume rates relates to job reading from "topic A" [which > has > > > the downward trend in traffic in the mentioned times]. > > > > > > [image: image.png] > > > > > > All these graphs are correlated and we are unable to isolate this > > problem. > > > there are other modules which consumes from this topic, and we have no > > slow > > > records logged here, which is why we are not sure of there is this > issue > > > with Flink alone. > > > > > > Thanks. > > > > > > |
Free forum by Nabble | Edit this page |