Kezhu Wang created FLINK-19717:
---------------------------------- Summary: SourceReaderBase.pollNext may return END_OF_INPUT if SplitReader.fetch throws Key: FLINK-19717 URL: https://issues.apache.org/jira/browse/FLINK-19717 Project: Flink Issue Type: Bug Components: Connectors / Common Affects Versions: 1.12.0 Reporter: Kezhu Wang Here are my imaginative execution flows: 1. In mailbox thread, we enters {{SourceReaderBase.getNextFetch}}. After executes {{splitFetcherManager.checkErrors()}} but before {{elementsQueue.poll()}}, {{SplitFetcher}} gets its chance to run. 2. {{SplitFetcher}} terminates due to exception from {{SplitReader.fetch}}. {{SplitFetcher.shutdownHook}} will removes this exceptional fetcher from {{SplitFetcherManager}}. 3. In mailbox thread, {{elementsQueue.poll()}} executes. If there is no elements in queue, {{elementsQueue}} will be reset to unavailable. 4. After getting no elements from {{SourceReaderBase.getNextFetch}}, we will enter {{SourceReaderBase.finishedOrAvailableLater}}. If the exceptional fetcher is last alive fetcher, then {{SourceReaderBase.finishedOrAvailableLater}} may evaluate to {{InputStatus.END_OF_INPUT}} 5. {{StreamTask}} will terminate itself due to {{InputStatus.END_OF_INPUT}}. Here is revised {{SourceReaderBaseTest.testExceptionInSplitReader}} which will fails in rate about 1/2. {code:java} @Test public void testExceptionInSplitReader() throws Exception { expectedException.expect(RuntimeException.class); expectedException.expectMessage("One or more fetchers have encountered exception"); final String errMsg = "Testing Exception"; FutureCompletingBlockingQueue<RecordsWithSplitIds<int[]>> elementsQueue = new FutureCompletingBlockingQueue<>(); // We have to handle split changes first, otherwise fetch will not be called. try (MockSourceReader reader = new MockSourceReader( elementsQueue, () -> new SplitReader<int[], MockSourceSplit>() { @Override public RecordsWithSplitIds<int[]> fetch() { throw new RuntimeException(errMsg); } @Override public void handleSplitsChanges(SplitsChange<MockSourceSplit> splitsChanges) {} @Override public void wakeUp() { } }, getConfig(), null)) { ValidatingSourceOutput output = new ValidatingSourceOutput(); reader.addSplits(Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED))); reader.handleSourceEvents(new NoMoreSplitsEvent()); // This is not a real infinite loop, it is supposed to throw exception after some polls. while (true) { InputStatus inputStatus = reader.pollNext(output); assertNotEquals(InputStatus.END_OF_INPUT, inputStatus); // Add a sleep to avoid tight loop. Thread.sleep(0); } } } {code} This revised {{SourceReaderBaseTest.testExceptionInSplitReader}} differs from existing one in three places: 1. {{reader.handleSourceEvents(new NoMoreSplitsEvent())}} sets {{SourceReaderBase.noMoreSplitsAssignment}} to true. 2. Add assertion to assert that {{reader.pollNext}} will not return {{InputStatus.END_OF_INPUT}}. 3. Modify {{Thread.sleep(1)}} to {{Thread.sleep(0)}} to increase failure rate from 1/200 to 1/2. See [FLINK-19448|https://issues.apache.org/jira/browse/FLINK-19448] for initial discussion. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |