Fabian Hueske created FLINK-1343:
------------------------------------
Summary: Branching Join Program Deadlocks
Key: FLINK-1343
URL:
https://issues.apache.org/jira/browse/FLINK-1343 Project: Flink
Issue Type: Bug
Components: Optimizer
Affects Versions: 0.8, 0.9
Reporter: Fabian Hueske
Assignee: Fabian Hueske
The following program which gets its data from a single non-parallel data source, branches two times, and joins the branches with two joins, deadlocks.
{code:java}
public class DeadlockProgram {
public static void main(String[] args) throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Long> longs = env.generateSequence(0,1000000l).setParallelism(1);
DataSet<Long> longs2 = env.generateSequence(0, 1000000l).setParallelism(1);
DataSet<Tuple1<Long>> longT1 = longs.map(new TupleWrapper());
DataSet<Tuple1<Long>> longT2 = longT1.project(0);
DataSet<Tuple1<Long>> longT3 = longs.map(new TupleWrapper()); // deadlocks
// DataSet<Tuple1<Long>> longT3 = longs2.map(new TupleWrapper()); // works
longT2.join(longT3).where(0).equalTo(0).projectFirst(0)
.join(longT1).where(0).equalTo(0).projectFirst(0)
.print();
env.execute();
}
public static class TupleWrapper implements MapFunction<Long, Tuple1<Long>> {
@Override
public Tuple1<Long> map(Long l) throws Exception {
return new Tuple1<Long>(l);
}
};
}
{code}
If one of the branches reads its data from a second data source (see inline comment) or if the single data source uses the default parallelism, the program executes correctly.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)