[ https://issues.apache.org/jira/browse/FLINK-935?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Stephan Ewen resolved FLINK-935. -------------------------------- Resolution: Fixed Fix Version/s: (was: pre-apache-0.5.2) Fixed via commit ca2b287a7a78328ebf43766b9fdf39b56fb5fd4f > Bug in Optimizer when reusing work across iterations > ---------------------------------------------------- > > Key: FLINK-935 > URL: https://issues.apache.org/jira/browse/FLINK-935 > Project: Flink > Issue Type: Bug > Components: Compiler/Optimizer > Affects Versions: 0.6-incubating, pre-apache-0.5, pre-apache-0.5.1 > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Fix For: 0.6-incubating > > Attachments: screenshot_plan.png > > > The following created plan is invalid > {code} > { > "nodes": [ > { > "id": 3, > "type": "source", > "pact": "Data Source", > "contents": "CSV Input (,) /some/file/path", > "parallelism": "4", > "subtasks_per_instance": "1", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 2, > "type": "pact", > "pact": "Map", > "contents": "eu.stratosphere.pact.compiler.IterationsCompilerTest$DuplicateValue", > "parallelism": "4", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 3, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort on [0:ASC]"} > ], > "driver_strategy": "Map", > "global_properties": [ > { "name": "Partitioning", "value": "HASH_PARTITIONED" }, > { "name": "Partitioned on", "value": "[0]" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "[0:ASC]" }, > { "name": "Grouped on", "value": "[0]" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "(unknown)" }, > { "name": "Disk I/O", "value": "(unknown)" }, > { "name": "CPU", "value": "(unknown)" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "step_function": [ > { > "id": 9, > "type": "pact", > "pact": "Bulk Partial Solution", > "contents": "Partial Solution", > "parallelism": "4", > "subtasks_per_instance": "1", > "global_properties": [ > { "name": "Partitioning", "value": "HASH_PARTITIONED" }, > { "name": "Partitioned on", "value": "[0]" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "[0:ASC]" }, > { "name": "Grouped on", "value": "[0]" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 10, > "type": "source", > "pact": "Data Source", > "contents": "CSV Input (,) /some/file/path", > "parallelism": "4", > "subtasks_per_instance": "1", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "0.0" }, > { "name": "Cumulative Disk I/O", "value": "0.0" }, > { "name": "Cumulative CPU", "value": "0.0" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 8, > "type": "pact", > "pact": "Join", > "contents": "eu.stratosphere.pact.compiler.IterationsCompilerTest$Join222", > "parallelism": "4", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 9, "side": "first", "ship_strategy": "Forward"}, > {"id": 10, "side": "second", "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort on [0:ASC]", "temp_mode": "CACHED"} > ], > "driver_strategy": "Merge", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "(unknown)" }, > { "name": "Disk I/O", "value": "(unknown)" }, > { "name": "CPU", "value": "(unknown)" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 7, > "type": "pact", > "pact": "GroupReduce", > "contents": "MIN(1)", > "parallelism": "4", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 8, "ship_strategy": "Forward"} > ], > "driver_strategy": "Sorted Combine", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 6, > "type": "pact", > "pact": "GroupReduce", > "contents": "MIN(1)", > "parallelism": "4", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 7, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort (combining) on [0:ASC]"} > ], > "driver_strategy": "Sorted Group Reduce", > "global_properties": [ > { "name": "Partitioning", "value": "HASH_PARTITIONED" }, > { "name": "Partitioned on", "value": "[0]" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "[0:ASC]" }, > { "name": "Grouped on", "value": "[0]" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "(unknown)" }, > { "name": "Disk I/O", "value": "(unknown)" }, > { "name": "CPU", "value": "(unknown)" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 5, > "type": "pact", > "pact": "Join", > "contents": "eu.stratosphere.api.java.operators.JoinOperator$DefaultJoinFunction", > "parallelism": "4", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 6, "side": "first", "ship_strategy": "Forward"}, > {"id": 9, "side": "second", "ship_strategy": "Forward", "temp_mode": "PIPELINE_BREAKER"} > ], > "driver_strategy": "Merge", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "(unknown)" }, > { "name": "CPU", "value": "(unknown)" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 4, > "type": "pact", > "pact": "FlatMap", > "contents": "eu.stratosphere.pact.compiler.IterationsCompilerTest$FlatMapJoin", > "parallelism": "4", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 5, "ship_strategy": "Forward"} > ], > "driver_strategy": "Map", > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, > { "name": "Filter Factor", "value": "(none)" } ] > } > ], > "partial_solution": 9, > "next_partial_solution": 4, > "id": 1, > "type": "bulk_iteration", > "pact": "Bulk Iteration", > "contents": "Bulk Iteration", > "parallelism": "4", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 2, "ship_strategy": "Forward"} > ], > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "(unknown)" }, > { "name": "Disk I/O", "value": "(unknown)" }, > { "name": "CPU", "value": "(unknown)" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, > { "name": "Filter Factor", "value": "(none)" } ] > }, > { > "id": 0, > "type": "sink", > "pact": "Data Sink", > "contents": "Print to System.out", > "parallelism": "4", > "subtasks_per_instance": "1", > "predecessors": [ > {"id": 1, "ship_strategy": "Forward"} > ], > "global_properties": [ > { "name": "Partitioning", "value": "RANDOM" }, > { "name": "Partitioning Order", "value": "(none)" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "local_properties": [ > { "name": "Order", "value": "(none)" }, > { "name": "Grouping", "value": "not grouped" }, > { "name": "Uniqueness", "value": "not unique" } > ], > "estimates": [ > { "name": "Est. Output Size", "value": "(unknown)" }, > { "name": "Est. Cardinality", "value": "(unknown)" } ], > "costs": [ > { "name": "Network", "value": "0.0" }, > { "name": "Disk I/O", "value": "0.0" }, > { "name": "CPU", "value": "0.0" }, > { "name": "Cumulative Network", "value": "(unknown)" }, > { "name": "Cumulative Disk I/O", "value": "(unknown)" }, > { "name": "Cumulative CPU", "value": "(unknown)" } > ], > "compiler_hints": [ > { "name": "Output Size (bytes)", "value": "(none)" }, > { "name": "Output Cardinality", "value": "(none)" }, > { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, > { "name": "Filter Factor", "value": "(none)" } ] > } > ] > } > {code} -- This message was sent by Atlassian JIRA (v6.2#6252) |
Free forum by Nabble | Edit this page |