Posted by
Shang Yuanchun (Jira) on
Jun 13, 2014; 3:09pm
URL: http://deprecated-apache-flink-mailing-list-archive.368.s1.nabble.com/jira-Created-FLINK-935-Bug-in-Optimizer-when-reusing-work-across-iterations-tp220.html
Stephan Ewen created FLINK-935:
----------------------------------
Summary: Bug in Optimizer when reusing work across iterations
Key: FLINK-935
URL:
https://issues.apache.org/jira/browse/FLINK-935 Project: Flink
Issue Type: Bug
Components: Compiler/Optimizer
Affects Versions: 0.6-incubating, pre-apache-0.5, pre-apache-0.5.1
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Fix For: 0.6-incubating, pre-apache-0.5.2
Attachments: screenshot.png
The following created plan is invalid
{code}
{
"nodes": [
{
"id": 3,
"type": "source",
"pact": "Data Source",
"contents": "CSV Input (,) /some/file/path",
"parallelism": "4",
"subtasks_per_instance": "1",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": "0.0" },
{ "name": "Cumulative Network", "value": "0.0" },
{ "name": "Cumulative Disk I/O", "value": "0.0" },
{ "name": "Cumulative CPU", "value": "0.0" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 2,
"type": "pact",
"pact": "Map",
"contents": "eu.stratosphere.pact.compiler.IterationsCompilerTest$DuplicateValue",
"parallelism": "4",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 3, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort on [0:ASC]"}
],
"driver_strategy": "Map",
"global_properties": [
{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
{ "name": "Partitioned on", "value": "[0]" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "[0:ASC]" },
{ "name": "Grouped on", "value": "[0]" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "(unknown)" },
{ "name": "Disk I/O", "value": "(unknown)" },
{ "name": "CPU", "value": "(unknown)" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"step_function": [
{
"id": 9,
"type": "pact",
"pact": "Bulk Partial Solution",
"contents": "Partial Solution",
"parallelism": "4",
"subtasks_per_instance": "1",
"global_properties": [
{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
{ "name": "Partitioned on", "value": "[0]" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "[0:ASC]" },
{ "name": "Grouped on", "value": "[0]" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": "0.0" },
{ "name": "Cumulative Network", "value": "0.0" },
{ "name": "Cumulative Disk I/O", "value": "0.0" },
{ "name": "Cumulative CPU", "value": "0.0" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 10,
"type": "source",
"pact": "Data Source",
"contents": "CSV Input (,) /some/file/path",
"parallelism": "4",
"subtasks_per_instance": "1",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": "0.0" },
{ "name": "Cumulative Network", "value": "0.0" },
{ "name": "Cumulative Disk I/O", "value": "0.0" },
{ "name": "Cumulative CPU", "value": "0.0" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 8,
"type": "pact",
"pact": "Join",
"contents": "eu.stratosphere.pact.compiler.IterationsCompilerTest$Join222",
"parallelism": "4",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 9, "side": "first", "ship_strategy": "Forward"},
{"id": 10, "side": "second", "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort on [0:ASC]", "temp_mode": "CACHED"}
],
"driver_strategy": "Merge",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "(unknown)" },
{ "name": "Disk I/O", "value": "(unknown)" },
{ "name": "CPU", "value": "(unknown)" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 7,
"type": "pact",
"pact": "GroupReduce",
"contents": "MIN(1)",
"parallelism": "4",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 8, "ship_strategy": "Forward"}
],
"driver_strategy": "Sorted Combine",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": "0.0" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 6,
"type": "pact",
"pact": "GroupReduce",
"contents": "MIN(1)",
"parallelism": "4",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 7, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort (combining) on [0:ASC]"}
],
"driver_strategy": "Sorted Group Reduce",
"global_properties": [
{ "name": "Partitioning", "value": "HASH_PARTITIONED" },
{ "name": "Partitioned on", "value": "[0]" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "[0:ASC]" },
{ "name": "Grouped on", "value": "[0]" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "(unknown)" },
{ "name": "Disk I/O", "value": "(unknown)" },
{ "name": "CPU", "value": "(unknown)" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 5,
"type": "pact",
"pact": "Join",
"contents": "eu.stratosphere.api.java.operators.JoinOperator$DefaultJoinFunction",
"parallelism": "4",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 6, "side": "first", "ship_strategy": "Forward"},
{"id": 9, "side": "second", "ship_strategy": "Forward", "temp_mode": "PIPELINE_BREAKER"}
],
"driver_strategy": "Merge",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "(unknown)" },
{ "name": "CPU", "value": "(unknown)" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 4,
"type": "pact",
"pact": "FlatMap",
"contents": "eu.stratosphere.pact.compiler.IterationsCompilerTest$FlatMapJoin",
"parallelism": "4",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 5, "ship_strategy": "Forward"}
],
"driver_strategy": "Map",
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": "0.0" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
}
],
"partial_solution": 9,
"next_partial_solution": 4,
"id": 1,
"type": "bulk_iteration",
"pact": "Bulk Iteration",
"contents": "Bulk Iteration",
"parallelism": "4",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 2, "ship_strategy": "Forward"}
],
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "(unknown)" },
{ "name": "Disk I/O", "value": "(unknown)" },
{ "name": "CPU", "value": "(unknown)" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
},
{
"id": 0,
"type": "sink",
"pact": "Data Sink",
"contents": "Print to System.out",
"parallelism": "4",
"subtasks_per_instance": "1",
"predecessors": [
{"id": 1, "ship_strategy": "Forward"}
],
"global_properties": [
{ "name": "Partitioning", "value": "RANDOM" },
{ "name": "Partitioning Order", "value": "(none)" },
{ "name": "Uniqueness", "value": "not unique" }
],
"local_properties": [
{ "name": "Order", "value": "(none)" },
{ "name": "Grouping", "value": "not grouped" },
{ "name": "Uniqueness", "value": "not unique" }
],
"estimates": [
{ "name": "Est. Output Size", "value": "(unknown)" },
{ "name": "Est. Cardinality", "value": "(unknown)" } ],
"costs": [
{ "name": "Network", "value": "0.0" },
{ "name": "Disk I/O", "value": "0.0" },
{ "name": "CPU", "value": "0.0" },
{ "name": "Cumulative Network", "value": "(unknown)" },
{ "name": "Cumulative Disk I/O", "value": "(unknown)" },
{ "name": "Cumulative CPU", "value": "(unknown)" }
],
"compiler_hints": [
{ "name": "Output Size (bytes)", "value": "(none)" },
{ "name": "Output Cardinality", "value": "(none)" },
{ "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
{ "name": "Filter Factor", "value": "(none)" } ]
}
]
}
{code}
--
This message was sent by Atlassian JIRA
(v6.2#6252)