Hi,
I want to join two tables in the following way: case class WeightedEdge(src: Int, target: Int, weight: Double) case class Community(communityID: Int, nodeID: Int) case class CommunitySumTotal(communityID: Int, sumTotal: Double) val communities: DataSet[Community] val weightedEdges: DataSet[WeightedEdge] val communitiesTable = communities.toTable val weightedEdgesTable = weightedEdges.toTable val sumTotal = communitiesTable.join(weightedEdgesTable) .where("nodeID = src && nodeID = target") .groupBy('communityID) .select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal] but I get this exception: Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: The types of the key fields do not match: The number of specified keys is different. at org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96) at org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197) at org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) at org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) at org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) at org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) Moreover when I use the following where clause: .where("nodeID = src || nodeID = target") I get another error: Exception in thread "main" org.apache.flink.api.table.ExpressionException: Could not derive equi-join predicates for predicate 'nodeID === 'src || 'nodeID === 'target. at org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) at org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) at org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) at org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) at org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) Apart from that the TableApi seems really promising. It's a really great tool. Thank you for your help, Felix |
So, the first thing is a "feature" of the Java API that removes
duplicate fields in keys, so an equi-join on (0,0) with (0,1) would throw an error because one 0 is removed from the first key. The second thing is a feature of the Table API where the error message is hinting at the problem: Could not derive equi-join predicates for predicate 'nodeID === 'src || 'nodeID === 'target The problem is, that this would have to be executed as a cross followed by a filter because none of the predicates are equi-join predicates that must always be true (because of the OR relation). This I don't want to allow, because a cross can be very expensive. I will add a jira ticket for adding a manual cross operation to the Table API. On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <[hidden email]> wrote: > Hi, > > I want to join two tables in the following way: > > case class WeightedEdge(src: Int, target: Int, weight: Double) > case class Community(communityID: Int, nodeID: Int) > > case class CommunitySumTotal(communityID: Int, sumTotal: Double) > > val communities: DataSet[Community] > val weightedEdges: DataSet[WeightedEdge] > > val communitiesTable = communities.toTable > val weightedEdgesTable = weightedEdges.toTable > > val sumTotal = communitiesTable.join(weightedEdgesTable) > .where("nodeID = src && nodeID = target") > .groupBy('communityID) > .select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal] > > > but I get this exception: > > Exception in thread "main" > org.apache.flink.api.common.InvalidProgramException: The types of the key > fields do not match: The number of specified keys is different. > at > org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96) > at > org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197) > at > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > at > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > at > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > Moreover when I use the following where clause: > > .where("nodeID = src || nodeID = target") > > I get another error: > > Exception in thread "main" > org.apache.flink.api.table.ExpressionException: Could not derive > equi-join predicates for predicate 'nodeID === 'src || 'nodeID === > 'target. > > at > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > at > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > at > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > at > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > > > Apart from that the TableApi seems really promising. It's a really great tool. > > Thank you for your help, > > Felix |
Why not doing two separate joins, union the results and doing a distinct
operation on the combined key? On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <[hidden email]> wrote: > So, the first thing is a "feature" of the Java API that removes > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would > throw an error because one 0 is removed from the first key. > > The second thing is a feature of the Table API where the error message > is hinting at the problem: > Could not derive equi-join predicates for predicate 'nodeID === 'src > || 'nodeID === 'target > > The problem is, that this would have to be executed as a cross > followed by a filter because none of the predicates are equi-join > predicates that must always be true (because of the OR relation). This > I don't want to allow, because a cross can be very expensive. I will > add a jira ticket for adding a manual cross operation to the Table > API. > > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <[hidden email]> > wrote: > > Hi, > > > > I want to join two tables in the following way: > > > > case class WeightedEdge(src: Int, target: Int, weight: Double) > > case class Community(communityID: Int, nodeID: Int) > > > > case class CommunitySumTotal(communityID: Int, sumTotal: Double) > > > > val communities: DataSet[Community] > > val weightedEdges: DataSet[WeightedEdge] > > > > val communitiesTable = communities.toTable > > val weightedEdgesTable = weightedEdges.toTable > > > > val sumTotal = communitiesTable.join(weightedEdgesTable) > > .where("nodeID = src && nodeID = target") > > .groupBy('communityID) > > .select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal] > > > > > > but I get this exception: > > > > Exception in thread "main" > > org.apache.flink.api.common.InvalidProgramException: The types of the key > > fields do not match: The number of specified keys is different. > > at > > > org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96) > > at > > > org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197) > > at > > > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310) > > at > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > > at > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > > at > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > > at > > > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > > at > > > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > > at > > > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > > Moreover when I use the following where clause: > > > > .where("nodeID = src || nodeID = target") > > > > I get another error: > > > > Exception in thread "main" > > org.apache.flink.api.table.ExpressionException: Could not derive > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID === > > 'target. > > > > at > > > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296) > > at > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > > at > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > > at > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > > at > > > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > > at > > > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > > at > > > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > > > > > > Apart from that the TableApi seems really promising. It's a really great > tool. > > > > Thank you for your help, > > > > Felix > |
I am also against the manual cross method. Isn't it the idea of the table
API to hide the actual implementation from the user? Best regards, Felix Am 17.04.2015 10:09 vorm. schrieb "Till Rohrmann" <[hidden email]>: > Why not doing two separate joins, union the results and doing a distinct > operation on the combined key? > > On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <[hidden email]> > wrote: > > > So, the first thing is a "feature" of the Java API that removes > > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would > > throw an error because one 0 is removed from the first key. > > > > The second thing is a feature of the Table API where the error message > > is hinting at the problem: > > Could not derive equi-join predicates for predicate 'nodeID === 'src > > || 'nodeID === 'target > > > > The problem is, that this would have to be executed as a cross > > followed by a filter because none of the predicates are equi-join > > predicates that must always be true (because of the OR relation). This > > I don't want to allow, because a cross can be very expensive. I will > > add a jira ticket for adding a manual cross operation to the Table > > API. > > > > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <[hidden email]> > > wrote: > > > Hi, > > > > > > I want to join two tables in the following way: > > > > > > case class WeightedEdge(src: Int, target: Int, weight: Double) > > > case class Community(communityID: Int, nodeID: Int) > > > > > > case class CommunitySumTotal(communityID: Int, sumTotal: Double) > > > > > > val communities: DataSet[Community] > > > val weightedEdges: DataSet[WeightedEdge] > > > > > > val communitiesTable = communities.toTable > > > val weightedEdgesTable = weightedEdges.toTable > > > > > > val sumTotal = communitiesTable.join(weightedEdgesTable) > > > .where("nodeID = src && nodeID = target") > > > .groupBy('communityID) > > > .select("communityID, weight.sum as > sumTotal").toSet[CommunitySumTotal] > > > > > > > > > but I get this exception: > > > > > > Exception in thread "main" > > > org.apache.flink.api.common.InvalidProgramException: The types of the > key > > > fields do not match: The number of specified keys is different. > > > at > > > > > > org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96) > > > at > > > > > > org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197) > > > at > > > > > > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310) > > > at > > > > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > > > at > > > > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > > > at > > > > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > > > at > > > > > > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > > > at > > > > > > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > > > at > > > > > > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > > > Moreover when I use the following where clause: > > > > > > .where("nodeID = src || nodeID = target") > > > > > > I get another error: > > > > > > Exception in thread "main" > > > org.apache.flink.api.table.ExpressionException: Could not derive > > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID === > > > 'target. > > > > > > at > > > > > > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296) > > > at > > > > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > > > at > > > > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > > > at > > > > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > > > at > > > > > > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > > > at > > > > > > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > > > at > > > > > > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > > > > > > > > > Apart from that the TableApi seems really promising. It's a really > great > > tool. > > > > > > Thank you for your help, > > > > > > Felix > > > |
Yes, that is the idea, but I think in this case the user must be
protected from an operation that can get ridiculously expensive. On Fri, Apr 17, 2015 at 10:20 AM, Felix Neutatz <[hidden email]> wrote: > I am also against the manual cross method. Isn't it the idea of the table > API to hide the actual implementation from the user? > > Best regards, > Felix > Am 17.04.2015 10:09 vorm. schrieb "Till Rohrmann" <[hidden email]>: > >> Why not doing two separate joins, union the results and doing a distinct >> operation on the combined key? >> >> On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <[hidden email]> >> wrote: >> >> > So, the first thing is a "feature" of the Java API that removes >> > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would >> > throw an error because one 0 is removed from the first key. >> > >> > The second thing is a feature of the Table API where the error message >> > is hinting at the problem: >> > Could not derive equi-join predicates for predicate 'nodeID === 'src >> > || 'nodeID === 'target >> > >> > The problem is, that this would have to be executed as a cross >> > followed by a filter because none of the predicates are equi-join >> > predicates that must always be true (because of the OR relation). This >> > I don't want to allow, because a cross can be very expensive. I will >> > add a jira ticket for adding a manual cross operation to the Table >> > API. >> > >> > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <[hidden email]> >> > wrote: >> > > Hi, >> > > >> > > I want to join two tables in the following way: >> > > >> > > case class WeightedEdge(src: Int, target: Int, weight: Double) >> > > case class Community(communityID: Int, nodeID: Int) >> > > >> > > case class CommunitySumTotal(communityID: Int, sumTotal: Double) >> > > >> > > val communities: DataSet[Community] >> > > val weightedEdges: DataSet[WeightedEdge] >> > > >> > > val communitiesTable = communities.toTable >> > > val weightedEdgesTable = weightedEdges.toTable >> > > >> > > val sumTotal = communitiesTable.join(weightedEdgesTable) >> > > .where("nodeID = src && nodeID = target") >> > > .groupBy('communityID) >> > > .select("communityID, weight.sum as >> sumTotal").toSet[CommunitySumTotal] >> > > >> > > >> > > but I get this exception: >> > > >> > > Exception in thread "main" >> > > org.apache.flink.api.common.InvalidProgramException: The types of the >> key >> > > fields do not match: The number of specified keys is different. >> > > at >> > > >> > >> org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96) >> > > at >> > > >> > >> org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197) >> > > at >> > > >> > >> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310) >> > > at >> > > >> > >> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) >> > > at >> > > >> > >> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) >> > > at >> > > >> > >> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) >> > > at >> > > >> > >> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) >> > > at >> > > >> > >> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) >> > > at >> > > >> > >> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) >> > > Moreover when I use the following where clause: >> > > >> > > .where("nodeID = src || nodeID = target") >> > > >> > > I get another error: >> > > >> > > Exception in thread "main" >> > > org.apache.flink.api.table.ExpressionException: Could not derive >> > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID === >> > > 'target. >> > > >> > > at >> > > >> > >> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296) >> > > at >> > > >> > >> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) >> > > at >> > > >> > >> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) >> > > at >> > > >> > >> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) >> > > at >> > > >> > >> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) >> > > at >> > > >> > >> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) >> > > at >> > > >> > >> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) >> > > >> > > >> > > Apart from that the TableApi seems really promising. It's a really >> great >> > tool. >> > > >> > > Thank you for your help, >> > > >> > > Felix >> > >> |
I agree with Aljoscha.
Let's give a good error message and offer a cross operator. 2015-04-17 4:52 GMT-05:00 Aljoscha Krettek <[hidden email]>: > Yes, that is the idea, but I think in this case the user must be > protected from an operation that can get ridiculously expensive. > > On Fri, Apr 17, 2015 at 10:20 AM, Felix Neutatz <[hidden email]> > wrote: > > I am also against the manual cross method. Isn't it the idea of the table > > API to hide the actual implementation from the user? > > > > Best regards, > > Felix > > Am 17.04.2015 10:09 vorm. schrieb "Till Rohrmann" < > [hidden email]>: > > > >> Why not doing two separate joins, union the results and doing a distinct > >> operation on the combined key? > >> > >> On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <[hidden email]> > >> wrote: > >> > >> > So, the first thing is a "feature" of the Java API that removes > >> > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would > >> > throw an error because one 0 is removed from the first key. > >> > > >> > The second thing is a feature of the Table API where the error message > >> > is hinting at the problem: > >> > Could not derive equi-join predicates for predicate 'nodeID === 'src > >> > || 'nodeID === 'target > >> > > >> > The problem is, that this would have to be executed as a cross > >> > followed by a filter because none of the predicates are equi-join > >> > predicates that must always be true (because of the OR relation). This > >> > I don't want to allow, because a cross can be very expensive. I will > >> > add a jira ticket for adding a manual cross operation to the Table > >> > API. > >> > > >> > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz < > [hidden email]> > >> > wrote: > >> > > Hi, > >> > > > >> > > I want to join two tables in the following way: > >> > > > >> > > case class WeightedEdge(src: Int, target: Int, weight: Double) > >> > > case class Community(communityID: Int, nodeID: Int) > >> > > > >> > > case class CommunitySumTotal(communityID: Int, sumTotal: Double) > >> > > > >> > > val communities: DataSet[Community] > >> > > val weightedEdges: DataSet[WeightedEdge] > >> > > > >> > > val communitiesTable = communities.toTable > >> > > val weightedEdgesTable = weightedEdges.toTable > >> > > > >> > > val sumTotal = communitiesTable.join(weightedEdgesTable) > >> > > .where("nodeID = src && nodeID = target") > >> > > .groupBy('communityID) > >> > > .select("communityID, weight.sum as > >> sumTotal").toSet[CommunitySumTotal] > >> > > > >> > > > >> > > but I get this exception: > >> > > > >> > > Exception in thread "main" > >> > > org.apache.flink.api.common.InvalidProgramException: The types of > the > >> key > >> > > fields do not match: The number of specified keys is different. > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > >> > > Moreover when I use the following where clause: > >> > > > >> > > .where("nodeID = src || nodeID = target") > >> > > > >> > > I get another error: > >> > > > >> > > Exception in thread "main" > >> > > org.apache.flink.api.table.ExpressionException: Could not derive > >> > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID === > >> > > 'target. > >> > > > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > >> > > at > >> > > > >> > > >> > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > >> > > > >> > > > >> > > Apart from that the TableApi seems really promising. It's a really > >> great > >> > tool. > >> > > > >> > > Thank you for your help, > >> > > > >> > > Felix > >> > > >> > |
I also agree with Aljoscha.
It is widely considered a big mistake in SQL that cross products are implicit rather than explicit (because they are so expensive) Let's not make the same mistake here to put the theoretical algebra over the practical user experience and program safety. On Fri, Apr 17, 2015 at 12:41 PM, Fabian Hueske <[hidden email]> wrote: > I agree with Aljoscha. > Let's give a good error message and offer a cross operator. > > 2015-04-17 4:52 GMT-05:00 Aljoscha Krettek <[hidden email]>: > > > Yes, that is the idea, but I think in this case the user must be > > protected from an operation that can get ridiculously expensive. > > > > On Fri, Apr 17, 2015 at 10:20 AM, Felix Neutatz <[hidden email]> > > wrote: > > > I am also against the manual cross method. Isn't it the idea of the > table > > > API to hide the actual implementation from the user? > > > > > > Best regards, > > > Felix > > > Am 17.04.2015 10:09 vorm. schrieb "Till Rohrmann" < > > [hidden email]>: > > > > > >> Why not doing two separate joins, union the results and doing a > distinct > > >> operation on the combined key? > > >> > > >> On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek < > [hidden email]> > > >> wrote: > > >> > > >> > So, the first thing is a "feature" of the Java API that removes > > >> > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would > > >> > throw an error because one 0 is removed from the first key. > > >> > > > >> > The second thing is a feature of the Table API where the error > message > > >> > is hinting at the problem: > > >> > Could not derive equi-join predicates for predicate 'nodeID === 'src > > >> > || 'nodeID === 'target > > >> > > > >> > The problem is, that this would have to be executed as a cross > > >> > followed by a filter because none of the predicates are equi-join > > >> > predicates that must always be true (because of the OR relation). > This > > >> > I don't want to allow, because a cross can be very expensive. I will > > >> > add a jira ticket for adding a manual cross operation to the Table > > >> > API. > > >> > > > >> > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz < > > [hidden email]> > > >> > wrote: > > >> > > Hi, > > >> > > > > >> > > I want to join two tables in the following way: > > >> > > > > >> > > case class WeightedEdge(src: Int, target: Int, weight: Double) > > >> > > case class Community(communityID: Int, nodeID: Int) > > >> > > > > >> > > case class CommunitySumTotal(communityID: Int, sumTotal: Double) > > >> > > > > >> > > val communities: DataSet[Community] > > >> > > val weightedEdges: DataSet[WeightedEdge] > > >> > > > > >> > > val communitiesTable = communities.toTable > > >> > > val weightedEdgesTable = weightedEdges.toTable > > >> > > > > >> > > val sumTotal = communitiesTable.join(weightedEdgesTable) > > >> > > .where("nodeID = src && nodeID = target") > > >> > > .groupBy('communityID) > > >> > > .select("communityID, weight.sum as > > >> sumTotal").toSet[CommunitySumTotal] > > >> > > > > >> > > > > >> > > but I get this exception: > > >> > > > > >> > > Exception in thread "main" > > >> > > org.apache.flink.api.common.InvalidProgramException: The types of > > the > > >> key > > >> > > fields do not match: The number of specified keys is different. > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > > >> > > Moreover when I use the following where clause: > > >> > > > > >> > > .where("nodeID = src || nodeID = target") > > >> > > > > >> > > I get another error: > > >> > > > > >> > > Exception in thread "main" > > >> > > org.apache.flink.api.table.ExpressionException: Could not derive > > >> > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID === > > >> > > 'target. > > >> > > > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55) > > >> > > at > > >> > > > > >> > > > >> > > > org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37) > > >> > > > > >> > > > > >> > > Apart from that the TableApi seems really promising. It's a really > > >> great > > >> > tool. > > >> > > > > >> > > Thank you for your help, > > >> > > > > >> > > Felix > > >> > > > >> > > > |
Free forum by Nabble | Edit this page |