[jira] [Created] (FLINK-935) Bug in Optimizer when reusing work across iterations

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-935) Bug in Optimizer when reusing work across iterations

Shang Yuanchun (Jira)
Stephan Ewen created FLINK-935:
----------------------------------

             Summary: Bug in Optimizer when reusing work across iterations
                 Key: FLINK-935
                 URL: https://issues.apache.org/jira/browse/FLINK-935
             Project: Flink
          Issue Type: Bug
          Components: Compiler/Optimizer
    Affects Versions: 0.6-incubating, pre-apache-0.5, pre-apache-0.5.1
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen
             Fix For: 0.6-incubating, pre-apache-0.5.2
         Attachments: screenshot.png

The following created plan is invalid

{code}
{
        "nodes": [

        {
                "id": 3,
                "type": "source",
                "pact": "Data Source",
                "contents": "CSV Input (,) /some/file/path",
                "parallelism": "4",
                "subtasks_per_instance": "1",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" } ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
                        { "name": "Filter Factor", "value": "(none)" } ]
        },
        {
                "id": 2,
                "type": "pact",
                "pact": "Map",
                "contents": "eu.stratosphere.pact.compiler.IterationsCompilerTest$DuplicateValue",
                "parallelism": "4",
                "subtasks_per_instance": "1",
                "predecessors": [
                        {"id": 3, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort on [0:ASC]"}
                ],
                "driver_strategy": "Map",
                "global_properties": [
                        { "name": "Partitioning", "value": "HASH_PARTITIONED" },
                        { "name": "Partitioned on", "value": "[0]" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "[0:ASC]" },
                        { "name": "Grouped on", "value": "[0]" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" } ],
                "costs": [
                        { "name": "Network", "value": "(unknown)" },
                        { "name": "Disk I/O", "value": "(unknown)" },
                        { "name": "CPU", "value": "(unknown)" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
                        { "name": "Filter Factor", "value": "(none)" } ]
        },
        {
                "step_function": [
        {
                "id": 9,
                "type": "pact",
                "pact": "Bulk Partial Solution",
                "contents": "Partial Solution",
                "parallelism": "4",
                "subtasks_per_instance": "1",
                "global_properties": [
                        { "name": "Partitioning", "value": "HASH_PARTITIONED" },
                        { "name": "Partitioned on", "value": "[0]" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "[0:ASC]" },
                        { "name": "Grouped on", "value": "[0]" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" } ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
                        { "name": "Filter Factor", "value": "(none)" } ]
        },
        {
                "id": 10,
                "type": "source",
                "pact": "Data Source",
                "contents": "CSV Input (,) /some/file/path",
                "parallelism": "4",
                "subtasks_per_instance": "1",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" } ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "0.0" },
                        { "name": "Cumulative Disk I/O", "value": "0.0" },
                        { "name": "Cumulative CPU", "value": "0.0" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
                        { "name": "Filter Factor", "value": "(none)" } ]
        },
        {
                "id": 8,
                "type": "pact",
                "pact": "Join",
                "contents": "eu.stratosphere.pact.compiler.IterationsCompilerTest$Join222",
                "parallelism": "4",
                "subtasks_per_instance": "1",
                "predecessors": [
                        {"id": 9, "side": "first", "ship_strategy": "Forward"},
                        {"id": 10, "side": "second", "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort on [0:ASC]", "temp_mode": "CACHED"}
                ],
                "driver_strategy": "Merge",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" } ],
                "costs": [
                        { "name": "Network", "value": "(unknown)" },
                        { "name": "Disk I/O", "value": "(unknown)" },
                        { "name": "CPU", "value": "(unknown)" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
                        { "name": "Filter Factor", "value": "(none)" } ]
        },
        {
                "id": 7,
                "type": "pact",
                "pact": "GroupReduce",
                "contents": "MIN(1)",
                "parallelism": "4",
                "subtasks_per_instance": "1",
                "predecessors": [
                        {"id": 8, "ship_strategy": "Forward"}
                ],
                "driver_strategy": "Sorted Combine",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" } ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
                        { "name": "Filter Factor", "value": "(none)" } ]
        },
        {
                "id": 6,
                "type": "pact",
                "pact": "GroupReduce",
                "contents": "MIN(1)",
                "parallelism": "4",
                "subtasks_per_instance": "1",
                "predecessors": [
                        {"id": 7, "ship_strategy": "Hash Partition on [0]", "local_strategy": "Sort (combining) on [0:ASC]"}
                ],
                "driver_strategy": "Sorted Group Reduce",
                "global_properties": [
                        { "name": "Partitioning", "value": "HASH_PARTITIONED" },
                        { "name": "Partitioned on", "value": "[0]" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "[0:ASC]" },
                        { "name": "Grouped on", "value": "[0]" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" } ],
                "costs": [
                        { "name": "Network", "value": "(unknown)" },
                        { "name": "Disk I/O", "value": "(unknown)" },
                        { "name": "CPU", "value": "(unknown)" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
                        { "name": "Filter Factor", "value": "(none)" } ]
        },
        {
                "id": 5,
                "type": "pact",
                "pact": "Join",
                "contents": "eu.stratosphere.api.java.operators.JoinOperator$DefaultJoinFunction",
                "parallelism": "4",
                "subtasks_per_instance": "1",
                "predecessors": [
                        {"id": 6, "side": "first", "ship_strategy": "Forward"},
                        {"id": 9, "side": "second", "ship_strategy": "Forward", "temp_mode": "PIPELINE_BREAKER"}
                ],
                "driver_strategy": "Merge",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" } ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "(unknown)" },
                        { "name": "CPU", "value": "(unknown)" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
                        { "name": "Filter Factor", "value": "(none)" } ]
        },
        {
                "id": 4,
                "type": "pact",
                "pact": "FlatMap",
                "contents": "eu.stratosphere.pact.compiler.IterationsCompilerTest$FlatMapJoin",
                "parallelism": "4",
                "subtasks_per_instance": "1",
                "predecessors": [
                        {"id": 5, "ship_strategy": "Forward"}
                ],
                "driver_strategy": "Map",
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" } ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
                        { "name": "Filter Factor", "value": "(none)" } ]
        }
                ],
                "partial_solution": 9,
                "next_partial_solution": 4,
                "id": 1,
                "type": "bulk_iteration",
                "pact": "Bulk Iteration",
                "contents": "Bulk Iteration",
                "parallelism": "4",
                "subtasks_per_instance": "1",
                "predecessors": [
                        {"id": 2, "ship_strategy": "Forward"}
                ],
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" } ],
                "costs": [
                        { "name": "Network", "value": "(unknown)" },
                        { "name": "Disk I/O", "value": "(unknown)" },
                        { "name": "CPU", "value": "(unknown)" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
                        { "name": "Filter Factor", "value": "(none)" } ]
        },
        {
                "id": 0,
                "type": "sink",
                "pact": "Data Sink",
                "contents": "Print to System.out",
                "parallelism": "4",
                "subtasks_per_instance": "1",
                "predecessors": [
                        {"id": 1, "ship_strategy": "Forward"}
                ],
                "global_properties": [
                        { "name": "Partitioning", "value": "RANDOM" },
                        { "name": "Partitioning Order", "value": "(none)" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "local_properties": [
                        { "name": "Order", "value": "(none)" },
                        { "name": "Grouping", "value": "not grouped" },
                        { "name": "Uniqueness", "value": "not unique" }
                ],
                "estimates": [
                        { "name": "Est. Output Size", "value": "(unknown)" },
                        { "name": "Est. Cardinality", "value": "(unknown)" } ],
                "costs": [
                        { "name": "Network", "value": "0.0" },
                        { "name": "Disk I/O", "value": "0.0" },
                        { "name": "CPU", "value": "0.0" },
                        { "name": "Cumulative Network", "value": "(unknown)" },
                        { "name": "Cumulative Disk I/O", "value": "(unknown)" },
                        { "name": "Cumulative CPU", "value": "(unknown)" }
                ],
                "compiler_hints": [
                        { "name": "Output Size (bytes)", "value": "(none)" },
                        { "name": "Output Cardinality", "value": "(none)" },
                        { "name": "Avg. Output Record Size (bytes)", "value": "(none)" },
                        { "name": "Filter Factor", "value": "(none)" } ]
        }
        ]
}
{code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)