[GitHub] incubator-flink pull request: New operator map partition function

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: New operator map partition function

zentol
GitHub user kfleischmann opened a pull request:

    https://github.com/apache/incubator-flink/pull/42

    New operator map partition function

   

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/kfleischmann/incubator-flink new_operator_MapPartitionFunction

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-flink/pull/42.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #42
   
----
commit 4e1b57d71f3a141b8206a62f12a9e102b7903fad
Author: kay <[hidden email]>
Date:   2014-06-25T09:57:45Z

    add some functions

commit b8eeb0c4875f8e914b4665ce750baf698a45d53f
Author: kay <[hidden email]>
Date:   2014-06-25T10:22:43Z

    MapPartitionDriver added

commit f8eeb9944171ef30857560bf3e9ff36a3e2d4f82
Author: kay <[hidden email]>
Date:   2014-06-25T10:30:57Z

    add map partition driver strategy

commit 6bbb4cc043d86af0421a3d4228fea788f62dbe64
Author: kay <[hidden email]>
Date:   2014-06-25T10:38:48Z

    add map partition node map partition descriptor

commit dbfe953bbb3bfb0f2aafafb719599fea8b8d56e6
Author: kay <[hidden email]>
Date:   2014-06-25T12:40:36Z

    final version integration

commit d1df8307c641071812cc4d1f83ad2aef0eb06570
Author: kay <[hidden email]>
Date:   2014-06-25T12:46:48Z

    cleanup

commit dba2ce299164019322d5c71f6d7cbdf8c73f9869
Author: kay <[hidden email]>
Date:   2014-06-25T13:19:44Z

    cleanup

commit 1f8e5a9aff5d1a45004ecb4694674e140ecc4cd5
Author: kay <[hidden email]>
Date:   2014-06-25T13:20:55Z

    cleanup

commit 8dc6d70678de827882363f3616a70f0f4c5cff85
Author: kay <[hidden email]>
Date:   2014-06-25T13:37:30Z

    add estimation for map partition operator

commit 968a46e7e164130b864bf7e186c2e461168e118f
Author: kay <[hidden email]>
Date:   2014-06-25T13:44:23Z

    add map partition test

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: New operator map partition function

zentol
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/42#discussion_r14186130
 
    --- Diff: stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java ---
    @@ -23,52 +23,19 @@
     import java.util.Map;
     import java.util.Set;
     
    +import eu.stratosphere.api.common.operators.base.*;
    --- End diff --
   
    Please run "mvn verify" locally. We have maven checkstyle in place. It forbids using star imports.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: New operator map partition function

zentol
In reply to this post by zentol
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/42#discussion_r14186141
 
    --- Diff: stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java ---
    @@ -665,6 +632,9 @@ else if (c instanceof GenericDataSourceBase) {
      else if (c instanceof MapOperatorBase) {
      n = new MapNode((MapOperatorBase<?, ?, ?>) c);
      }
    +            else if (c instanceof MapPartitionOperatorBase) {
    --- End diff --
   
    we are using tabs instead of spaces.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: New operator map partition function

zentol
In reply to this post by zentol
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/42#discussion_r14186207
 
    --- Diff: stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCount.java ---
    @@ -59,8 +59,11 @@ public static void main(String[] args) throws Exception {
     
      // get input data
      DataSet<String> text = getTextDataSet(env);
    -
    - DataSet<Tuple2<String, Integer>> counts =
    +
    +
    +
    +
    + DataSet<Tuple2<String, Integer>> counts =
    --- End diff --
   
    Can you remove these blank lines again?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

Re: [GitHub] incubator-flink pull request: New operator map partition function

Ufuk Celebi
In reply to this post by zentol
You can also do

        mvn checkstyle:check

if you only want to run checkstyle.

On 25 Jun 2014, at 15:51, rmetzger <[hidden email]> wrote:

> Github user rmetzger commented on a diff in the pull request:
>
>    https://github.com/apache/incubator-flink/pull/42#discussion_r14186130
>
>    --- Diff: stratosphere-compiler/src/main/java/eu/stratosphere/compiler/PactCompiler.java ---
>    @@ -23,52 +23,19 @@
>     import java.util.Map;
>     import java.util.Set;
>
>    +import eu.stratosphere.api.common.operators.base.*;
>    --- End diff --
>
>    Please run "mvn verify" locally. We have maven checkstyle in place. It forbids using star imports.
>
>
> ---
> If your project is set up for it, you can reply to this email and have your
> reply appear on GitHub as well. If your project does not have this feature
> enabled and wishes so, or if the feature is enabled but not working, please
> contact infrastructure at [hidden email] or file a JIRA ticket
> with INFRA.
> ---

Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: New operator map partition function

zentol
In reply to this post by zentol
Github user rmetzger commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/42#discussion_r14186251
 
    --- Diff: stratosphere-examples/stratosphere-java-examples/src/main/java/eu/stratosphere/example/java/wordcount/WordCountWithMapPartition.java ---
    @@ -0,0 +1,162 @@
    +/***********************************************************************************************************************
    + *
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + *
    + **********************************************************************************************************************/
    +package eu.stratosphere.example.java.wordcount;
    +
    +import eu.stratosphere.api.java.DataSet;
    +import eu.stratosphere.api.java.ExecutionEnvironment;
    +import eu.stratosphere.api.java.functions.FlatMapFunction;
    +import eu.stratosphere.api.java.functions.MapPartitionFunction;
    +import eu.stratosphere.api.java.tuple.Tuple2;
    +import eu.stratosphere.example.java.wordcount.util.WordCountData;
    +import eu.stratosphere.util.Collector;
    +
    +import java.util.Iterator;
    +
    +/**
    + * Implements the "WordCount" program that computes a simple word occurrence histogram
    + * over text files.
    + *
    + * <p>
    + * The input is a plain text file with lines separated by newline characters.
    + *
    + * <p>
    + * Usage: <code>WordCount &lt;text path&gt; &lt;result path&gt;</code><br>
    + * If no parameters are provided, the program is run with default data from {@link WordCountData}.
    + *
    + * <p>
    + * This example shows how to:
    + * <ul>
    + * <li>write a simple Stratosphere program.
    + * <li>use Tuple data types.
    + * <li>write and use user-defined functions.
    + * </ul>
    + *
    + */
    +@SuppressWarnings("serial")
    +public class WordCountWithMapPartition {
    +
    + // *************************************************************************
    + //     PROGRAM
    + // *************************************************************************
    +
    + public static void main(String[] args) throws Exception {
    +
    + if(!parseParameters(args)) {
    + return;
    + }
    +
    + // set up the execution environment
    + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    +
    + // get input data
    + DataSet<String> text = getTextDataSet(env);
    +
    +
    +
    +
    --- End diff --
   
    too many blank lines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: New operator map partition function

zentol
In reply to this post by zentol
Github user uce commented on the pull request:

    https://github.com/apache/incubator-flink/pull/42#issuecomment-47105893
 
    I think this will be a nice addition to the API.
   
    Would it make sense to rename the operator to flatMapPartition? It might be confusing in relation to the existing map and flatMap functions. Map is a 1:1 mapping wheras a flatMap is the 1:n mapping. The new operator is called `mapParititons`, but is able to collect multiple elements.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: New operator map partition function

zentol
In reply to this post by zentol
Github user markus-h commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/42#discussion_r14190037
 
    --- Diff: stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java ---
    @@ -0,0 +1,56 @@
    +/***********************************************************************************************************************
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + **********************************************************************************************************************/
    +
    +package eu.stratosphere.compiler.dag;
    +
    +import java.util.Collections;
    +import java.util.List;
    +
    +import eu.stratosphere.api.common.operators.SingleInputOperator;
    +import eu.stratosphere.compiler.DataStatistics;
    +import eu.stratosphere.compiler.operators.MapPartitionDescriptor;
    +import eu.stratosphere.compiler.operators.OperatorDescriptorSingle;
    +
    +/**
    + * The optimizer's internal representation of a <i>MapPartition</i> operator node.
    + */
    +public class MapPartitionNode extends SingleInputNode {
    +
    + /**
    + * Creates a new MapNode for the given contract.
    + *
    + * @param operator The map partition contract object.
    + */
    + public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
    + super(operator);
    + }
    +
    + @Override
    + public String getName() {
    + return "MapPartition";
    + }
    +
    + @Override
    + protected List<OperatorDescriptorSingle> getPossibleProperties() {
    + return Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
    + }
    +
    + /**
    + * Computes the estimates for the MapPartition operator.
    + * We assume that by default, Map takes one value and transforms it into another value.
    + * The cardinality consequently stays the same.
    + */
    + @Override
    + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
    + }
    --- End diff --
   
    Why did you leave this method empty?
    I think it should be something similar to the regular map:
    this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: New operator map partition function

zentol
In reply to this post by zentol
Github user kfleischmann commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/42#discussion_r14192094
 
    --- Diff: stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java ---
    @@ -0,0 +1,56 @@
    +/***********************************************************************************************************************
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + **********************************************************************************************************************/
    +
    +package eu.stratosphere.compiler.dag;
    +
    +import java.util.Collections;
    +import java.util.List;
    +
    +import eu.stratosphere.api.common.operators.SingleInputOperator;
    +import eu.stratosphere.compiler.DataStatistics;
    +import eu.stratosphere.compiler.operators.MapPartitionDescriptor;
    +import eu.stratosphere.compiler.operators.OperatorDescriptorSingle;
    +
    +/**
    + * The optimizer's internal representation of a <i>MapPartition</i> operator node.
    + */
    +public class MapPartitionNode extends SingleInputNode {
    +
    + /**
    + * Creates a new MapNode for the given contract.
    + *
    + * @param operator The map partition contract object.
    + */
    + public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
    + super(operator);
    + }
    +
    + @Override
    + public String getName() {
    + return "MapPartition";
    + }
    +
    + @Override
    + protected List<OperatorDescriptorSingle> getPossibleProperties() {
    + return Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
    + }
    +
    + /**
    + * Computes the estimates for the MapPartition operator.
    + * We assume that by default, Map takes one value and transforms it into another value.
    + * The cardinality consequently stays the same.
    + */
    + @Override
    + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
    + }
    --- End diff --
   
    Because we cannot do any predictions about the output of this operator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: New operator map partition function

zentol
In reply to this post by zentol
Github user uce commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/42#discussion_r14194572
 
    --- Diff: stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java ---
    @@ -0,0 +1,56 @@
    +/***********************************************************************************************************************
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + **********************************************************************************************************************/
    +
    +package eu.stratosphere.compiler.dag;
    +
    +import java.util.Collections;
    +import java.util.List;
    +
    +import eu.stratosphere.api.common.operators.SingleInputOperator;
    +import eu.stratosphere.compiler.DataStatistics;
    +import eu.stratosphere.compiler.operators.MapPartitionDescriptor;
    +import eu.stratosphere.compiler.operators.OperatorDescriptorSingle;
    +
    +/**
    + * The optimizer's internal representation of a <i>MapPartition</i> operator node.
    + */
    +public class MapPartitionNode extends SingleInputNode {
    +
    + /**
    + * Creates a new MapNode for the given contract.
    + *
    + * @param operator The map partition contract object.
    + */
    + public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
    + super(operator);
    + }
    +
    + @Override
    + public String getName() {
    + return "MapPartition";
    + }
    +
    + @Override
    + protected List<OperatorDescriptorSingle> getPossibleProperties() {
    + return Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
    + }
    +
    + /**
    + * Computes the estimates for the MapPartition operator.
    + * We assume that by default, Map takes one value and transforms it into another value.
    + * The cardinality consequently stays the same.
    + */
    + @Override
    + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
    + }
    --- End diff --
   
    In FlatMapNode we use `getPredecessorNode().getEstimatedNumRecords() * 5` as an estimate and in MapNode `getPredecessorNode().getEstimatedNumRecords()`. What's the difference which prevents us from doing the same here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---
Reply | Threaded
Open this post in threaded view
|

[GitHub] incubator-flink pull request: New operator map partition function

zentol
In reply to this post by zentol
Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/incubator-flink/pull/42#discussion_r14196207
 
    --- Diff: stratosphere-compiler/src/main/java/eu/stratosphere/compiler/dag/MapPartitionNode.java ---
    @@ -0,0 +1,56 @@
    +/***********************************************************************************************************************
    + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
    + *
    + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
    + * the License. You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
    + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
    + * specific language governing permissions and limitations under the License.
    + **********************************************************************************************************************/
    +
    +package eu.stratosphere.compiler.dag;
    +
    +import java.util.Collections;
    +import java.util.List;
    +
    +import eu.stratosphere.api.common.operators.SingleInputOperator;
    +import eu.stratosphere.compiler.DataStatistics;
    +import eu.stratosphere.compiler.operators.MapPartitionDescriptor;
    +import eu.stratosphere.compiler.operators.OperatorDescriptorSingle;
    +
    +/**
    + * The optimizer's internal representation of a <i>MapPartition</i> operator node.
    + */
    +public class MapPartitionNode extends SingleInputNode {
    +
    + /**
    + * Creates a new MapNode for the given contract.
    + *
    + * @param operator The map partition contract object.
    + */
    + public MapPartitionNode(SingleInputOperator<?, ?, ?> operator) {
    + super(operator);
    + }
    +
    + @Override
    + public String getName() {
    + return "MapPartition";
    + }
    +
    + @Override
    + protected List<OperatorDescriptorSingle> getPossibleProperties() {
    + return Collections.<OperatorDescriptorSingle>singletonList(new MapPartitionDescriptor());
    + }
    +
    + /**
    + * Computes the estimates for the MapPartition operator.
    + * We assume that by default, Map takes one value and transforms it into another value.
    + * The cardinality consequently stays the same.
    + */
    + @Override
    + protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
    + }
    --- End diff --
   
    I would guess that a PartitionMap is used when multiple values should be somehow (in a non-deterministic way) aggregated. Otherwise one would (and should!) use a Map- or FlatMapFunction.
   
    An estimate which is based on the input card is not a good idea, IMHO.
    I would assume that the output card is more likely to be around 1 * DOP, but this is just a gut feeling ;-)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [hidden email] or file a JIRA ticket
with INFRA.
---