Haibo Sun created FLINK-16174:
---------------------------------
Summary: Add a better tryYield() method to MailboxExecutor to return the lowest priority of the remaining mails
Key: FLINK-16174
URL:
https://issues.apache.org/jira/browse/FLINK-16174 Project: Flink
Issue Type: Improvement
Components: Runtime / Task
Affects Versions: 1.10.0
Reporter: Haibo Sun
Currently, we use chainIndex as the priority to create MailboxExecutor to process its mails. When MailboxExecutor#tryYield is called to process mails, it will take the mails of this operator and all downstream operators in the chain. But sometimes, after calling MailboxExecutor#tryYield, we need to know whether there is any mail of the current operator in the mailbox, which can simplify some operations.
For example, when we close a operator in runtime, after quiescing the processing time service and waiting for its running timers to finish, if there is no mail of the current operator in the mailbox, we call StreamOperator#close to close the operator. Then the runtime code of closing a operator can be simplified as follows.
{code:java}
quiesceProcessingTimeService().get();
while (mailboxExecuto.betterTryYield() <= self.priority) {}
closeOperator(actionExecutor);
{code}
With the existing #tryYield method, if the following simplified code is used to close a operator, then when a downstream operator is implemented like MailboxOperatorTest.ReplicatingMail, the tryyield() loop will
be prevented from exiting, which results deadlock.
{code:java}
quiesceProcessingTimeService().get();
while (mailboxExecuto.tryYield()) {}
closeOperator(actionExecutor);
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)