A Question About Flink Network Stack

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

A Question About Flink Network Stack

Jiayi Liao
Hi all,
I’m not very familiar with the network part of Flink. And a question occurs to me after I read most related source codes about network. I’ve seen that Flink uses the credit-based machanism to solve the blocking problem from receivers’ side, which means that one “slow” input channel won’t block other input channels’ consumption because of their own exclusive credits.
However, from the sender’s side, I find that memory segments sent to all receivers’ channels share the same local segment pool(LocalBufferPool), which may cause a problem here. Assume that we have a non-parallel source, which is partitioned into a map operator, whose parallelism is two, and one of the map tasks is consuming very slow. Is there any possibility that the memory segments which should be sent to the slower receiver fill the whole local segment pool, which blocks the data which should be sent to the faster receiver?
I appreciate any comments or answers, and please correct me if I am wrong about this.




Best Regards,
Jiayi Liao
Reply | Threaded
Open this post in threaded view
|

Re: A Question About Flink Network Stack

Zhijiang(wangzhijiang999)
Hi Jiayi,

Thanks for concerning the network stack and you pointed out a very good question.

Your understanding is right. In credit-based mode, on receiver side it has fixed exclusive buffers(credits) for each remote input channel to confirm every channel could receive data in parallel, not block each other.
The receiver also has a floating shared buffer pool for all the input channels in order to give more credits for large backlog on sender side.

On sender side it still uses a shared buffer pool for all the subpartitions. In one-to-one mode which means one producer only produces data for one consumer, then it seems no other concerns. In all-to-all mode which means
one producer emits data for all the consumers, then the buffers in pool might be eventually accumulated into the slow subpartition until exhausted, which would cause the other fast subpartitions have no available buffers to fill in more data.
This would cause backpressure finally.

Because the operator does not know the condition of buffer usage and it could not select which records are emitted in priority. Until the record is emitted by producer then we could know which subpartition covers this record via ChannelSelector. If we do not serialize this record into slow subpartition to occupy buffer resource, then it needs additional memory overhead for caching this record, which is not within expectation to cause unstable. So on producer side it seems have no other choice until the buffer resource is exhausted.

The credit-based is not for solving the backpressure issue which would not be avoided completely.  The credit-based could bring obvious benefits for one-to-one mode sharing tcp channel in backpressure scenario, and could aovid overhead memory usages in netty stack to casue unstable and speed up exactly-once checkpoint for avoiding spilling blocked data.

In addition, we ever implemeted an improvement for RebalanceStrategy in considering the slow subpartition issue. For rebalance channel selector, the record could be emitted to any subpartitions actually, no correctness issue. Then when the record is emmited, we select the fastest subpartition to take this record based on the current backlog size instead of previous round-robin way. Then it could bing benefits for some scenarios.

Best,
Zhijiang
------------------------------------------------------------------
From:bupt_ljy <[hidden email]>
Send Time:2019年6月18日(星期二) 16:35
To:dev <[hidden email]>
Subject:A Question About Flink Network Stack

Hi all,
I’m not very familiar with the network part of Flink. And a question occurs to me after I read most related source codes about network. I’ve seen that Flink uses the credit-based machanism to solve the blocking problem from receivers’ side, which means that one “slow” input channel won’t block other input channels’ consumption because of their own exclusive credits.
However, from the sender’s side, I find that memory segments sent to all receivers’ channels share the same local segment pool(LocalBufferPool), which may cause a problem here. Assume that we have a non-parallel source, which is partitioned into a map operator, whose parallelism is two, and one of the map tasks is consuming very slow. Is there any possibility that the memory segments which should be sent to the slower receiver fill the whole local segment pool, which blocks the data which should be sent to the faster receiver?
I appreciate any comments or answers, and please correct me if I am wrong about this.




Best Regards,
Jiayi Liao

Reply | Threaded
Open this post in threaded view
|

Re: A Question About Flink Network Stack

Jiayi Liao
In reply to this post by Jiayi Liao
Hi Zhijiang,


Thank you for the detailed explaination!


Best Regards,
Jiayi Liao




Original Message
Sender:[hidden email]
Recipient:[hidden email]
Date:Tuesday, Jun 18, 2019 17:34
Subject:Re: A Question About Flink Network Stack


Hi Jiayi, Thanks for concerning the network stack and you pointed out a very good question. Your understanding is right. In credit-based mode, on receiver side it has fixed exclusive buffers(credits) for each remote input channel to confirm every channel could receive data in parallel, not block each other. The receiver also has a floating shared buffer pool for all the input channels in order to give more credits for large backlog on sender side. On sender side it still uses a shared buffer pool for all the subpartitions. In one-to-one mode which means one producer only produces data for one consumer, then it seems no other concerns. In all-to-all mode which means one producer emits data for all the consumers, then the buffers in pool might be eventually accumulated into the slow subpartition until exhausted, which would cause the other fast subpartitions have no available buffers to fill in more data. This would cause backpressure finally. Because the operator does not know the condition of buffer usage and it could not select which records are emitted in priority. Until the record is emitted by producer then we could know which subpartition covers this record via ChannelSelector. If we do not serialize this record into slow subpartition to occupy buffer resource, then it needs additional memory overhead for caching this record, which is not within expectation to cause unstable. So on producer side it seems have no other choice until the buffer resource is exhausted. The credit-based is not for solving the backpressure issue which would not be avoided completely. The credit-based could bring obvious benefits for one-to-one mode sharing tcp channel in backpressure scenario, and could aovid overhead memory usages in netty stack to casue unstable and speed up exactly-once checkpoint for avoiding spilling blocked data. In addition, we ever implemeted an improvement for RebalanceStrategy in considering the slow subpartition issue. For rebalance channel selector, the record could be emitted to any subpartitions actually, no correctness issue. Then when the record is emmited, we select the fastest subpartition to take this record based on the current backlog size instead of previous round-robin way. Then it could bing benefits for some scenarios. Best, Zhijiang ------------------------------------------------------------------ From:bupt_ljy [hidden email] Send Time:2019年6月18日(星期二) 16:35 To:dev [hidden email] Subject:A Question About Flink Network Stack Hi all, I’m not very familiar with the network part of Flink. And a question occurs to me after I read most related source codes about network. I’ve seen that Flink uses the credit-based machanism to solve the blocking problem from receivers’ side, which means that one “slow” input channel won’t block other input channels’ consumption because of their own exclusive credits. However, from the sender’s side, I find that memory segments sent to all receivers’ channels share the same local segment pool(LocalBufferPool), which may cause a problem here. Assume that we have a non-parallel source, which is partitioned into a map operator, whose parallelism is two, and one of the map tasks is consuming very slow. Is there any possibility that the memory segments which should be sent to the slower receiver fill the whole local segment pool, which blocks the data which should be sent to the faster receiver? I appreciate any comments or answers, and please correct me if I am wrong about this. Best Regards, Jiayi Liao