Markus Holzemer created FLINK-1018:
-------------------------------------- Summary: Logistic Regression deadlocks Key: FLINK-1018 URL: https://issues.apache.org/jira/browse/FLINK-1018 Project: Flink Issue Type: Bug Reporter: Markus Holzemer Attachments: LogisticRegression.java We are currently running our implementation of logistic regression with batch gradient descent on the cluster. Unfortunatelly for datasets > 1GB it seems to deadlock inside of the iteration. This means the first iteration is never finished. The iteration does a map over all points, the map gets the iteration input as broadcast variable. The result of the map is reduced and the result of the reducer (1 tuple) is crossed with the iteration input. There should be no reason for the deadlock, since the data is still quite small compared to the cluster size (4 nodes a 32GB). Also the datasize stays constant throughout the algorithm. Here is the generated plan. I will also attach the full algorithm. {code} { "nodes": [ { "id": 2, "type": "source", "pact": "Data Source", "contents": "[([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.", "parallelism": "1", "subtasks_per_instance": "1", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "0.0 B" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "0.0 B" }, { "name": "Cumulative Disk I/O", "value": "0.0 B" }, { "name": "Cumulative CPU", "value": "0.0 " } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "step_function": [ { "id": 7, "type": "source", "pact": "Data Source", "contents": "TextInputFormat (D:/Devel/HIGGS-0.0001.csv) - UTF-8", "parallelism": "2", "subtasks_per_instance": "2", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "83.27 MB" }, { "name": "Est. Cardinality", "value": "113.9. K" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "83.27 MB" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "0.0 B" }, { "name": "Cumulative Disk I/O", "value": "83.27 MB" }, { "name": "Cumulative CPU", "value": "0.0 " } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 6, "type": "pact", "pact": "Map", "contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$6", "parallelism": "2", "subtasks_per_instance": "2", "predecessors": [ {"id": 7, "ship_strategy": "Forward"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "113.9. K" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "0.0 B" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "0.0 B" }, { "name": "Cumulative Disk I/O", "value": "83.27 MB" }, { "name": "Cumulative CPU", "value": "0.0 " } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 9, "type": "pact", "pact": "Map", "contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$1", "parallelism": "2", "subtasks_per_instance": "2", "predecessors": [ {"id": 6, "ship_strategy": "Forward"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "113.9. K" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "0.0 B" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "0.0 B" }, { "name": "Cumulative Disk I/O", "value": "41.63 MB" }, { "name": "Cumulative CPU", "value": "0.0 " } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 8, "type": "pact", "pact": "Reduce", "contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$2", "parallelism": "1", "subtasks_per_instance": "1", "predecessors": [ {"id": 9, "ship_strategy": "Forward"} ], "driver_strategy": "Reduce All", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "0.0 B" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "0.0 B" }, { "name": "Cumulative Disk I/O", "value": "41.63 MB" }, { "name": "Cumulative CPU", "value": "0.0 " } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 10, "type": "pact", "pact": "Bulk Partial Solution", "contents": "Partial Solution", "parallelism": "2", "subtasks_per_instance": "2", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "0.0 B" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "0.0 B" }, { "name": "Cumulative Disk I/O", "value": "0.0 B" }, { "name": "Cumulative CPU", "value": "0.0 " } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 5, "type": "pact", "pact": "Map", "contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$3", "parallelism": "2", "subtasks_per_instance": "2", "predecessors": [ {"id": 6, "side": "first", "ship_strategy": "Forward", "temp_mode": "CACHED"}, {"id": 8, "side": "second", "ship_strategy": "Broadcast"}, {"id": 10, "side": "second", "ship_strategy": "Broadcast"} ], "driver_strategy": "Map", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "113.9. K" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "(unknown)" }, { "name": "CPU", "value": "(unknown)" }, { "name": "Cumulative Network", "value": "0.0 B" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 4, "type": "pact", "pact": "Reduce", "contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4", "parallelism": "1", "subtasks_per_instance": "1", "predecessors": [ {"id": 5, "ship_strategy": "Forward"} ], "driver_strategy": "Reduce All", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "0.0 B" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "0.0 B" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 3, "type": "pact", "pact": "Cross", "contents": "de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$5", "parallelism": "2", "subtasks_per_instance": "2", "predecessors": [ {"id": 4, "side": "first", "ship_strategy": "Forward"}, {"id": 10, "side": "second", "ship_strategy": "Broadcast", "temp_mode": "PIPELINE_BREAKER"} ], "driver_strategy": "Nested Loops (Blocked Outer: de.tu_berlin.impro3.stratosphere.classification.logreg.LogisticRegression$4)", "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "(unknown)" }, { "name": "Disk I/O", "value": "(unknown)" }, { "name": "CPU", "value": "(unknown)" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] } ], "partial_solution": 10, "next_partial_solution": 3, "id": 1, "type": "bulk_iteration", "pact": "Bulk Iteration", "contents": "Bulk Iteration", "parallelism": "2", "subtasks_per_instance": "2", "predecessors": [ {"id": 2, "ship_strategy": "Forward"} ], "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "(unknown)" }, { "name": "Disk I/O", "value": "(unknown)" }, { "name": "CPU", "value": "(unknown)" }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] }, { "id": 0, "type": "sink", "pact": "Data Sink", "contents": "TextOutputFormat (D:/Devel/theta) - UTF-8", "parallelism": "2", "subtasks_per_instance": "2", "predecessors": [ {"id": 1, "ship_strategy": "Forward"} ], "global_properties": [ { "name": "Partitioning", "value": "RANDOM" }, { "name": "Partitioning Order", "value": "(none)" }, { "name": "Uniqueness", "value": "not unique" } ], "local_properties": [ { "name": "Order", "value": "(none)" }, { "name": "Grouping", "value": "not grouped" }, { "name": "Uniqueness", "value": "not unique" } ], "estimates": [ { "name": "Est. Output Size", "value": "(unknown)" }, { "name": "Est. Cardinality", "value": "(unknown)" } ], "costs": [ { "name": "Network", "value": "0.0 B" }, { "name": "Disk I/O", "value": "0.0 B" }, { "name": "CPU", "value": "0.0 " }, { "name": "Cumulative Network", "value": "(unknown)" }, { "name": "Cumulative Disk I/O", "value": "(unknown)" }, { "name": "Cumulative CPU", "value": "(unknown)" } ], "compiler_hints": [ { "name": "Output Size (bytes)", "value": "(none)" }, { "name": "Output Cardinality", "value": "(none)" }, { "name": "Avg. Output Record Size (bytes)", "value": "(none)" }, { "name": "Filter Factor", "value": "(none)" } ] } ] } {code} -- This message was sent by Atlassian JIRA (v6.2#6252) |
Free forum by Nabble | Edit this page |