TableAPI - Join on two keys

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

TableAPI - Join on two keys

Felix Neutatz
Hi,

I want to join two tables in the following way:

case class WeightedEdge(src: Int, target: Int, weight: Double)
case class Community(communityID: Int, nodeID: Int)

case class CommunitySumTotal(communityID: Int, sumTotal: Double)

val communities: DataSet[Community]
val weightedEdges: DataSet[WeightedEdge]

val communitiesTable = communities.toTable
val weightedEdgesTable = weightedEdges.toTable

val sumTotal = communitiesTable.join(weightedEdgesTable)
 .where("nodeID = src && nodeID = target")
 .groupBy('communityID)
 .select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal]


but I get this exception:

Exception in thread "main"
org.apache.flink.api.common.InvalidProgramException: The types of the key
fields do not match: The number of specified keys is different.
at
org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
at
org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
at
org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
at
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
Moreover when I use the following where clause:

.where("nodeID = src || nodeID = target")

I get another error:

Exception in thread "main"
org.apache.flink.api.table.ExpressionException: Could not derive
equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
'target.

at
org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
at
org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
at
org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
at
org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)


Apart from that the TableApi seems really promising. It's a really great tool.

Thank you for your help,

Felix
Reply | Threaded
Open this post in threaded view
|

Re: TableAPI - Join on two keys

Aljoscha Krettek-2
So, the first thing is a "feature" of the Java API that removes
duplicate fields in keys, so an equi-join on (0,0) with (0,1) would
throw an error because one 0 is removed from the first key.

The second thing is a feature of the Table API where the error message
is hinting at the problem:
Could not derive equi-join predicates for predicate 'nodeID === 'src
|| 'nodeID === 'target

The problem is, that this would have to be executed as a cross
followed by a filter because none of the predicates are equi-join
predicates that must always be true (because of the OR relation). This
I don't want to allow, because a cross can be very expensive. I will
add a jira ticket for adding a manual cross operation to the Table
API.

On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <[hidden email]> wrote:

> Hi,
>
> I want to join two tables in the following way:
>
> case class WeightedEdge(src: Int, target: Int, weight: Double)
> case class Community(communityID: Int, nodeID: Int)
>
> case class CommunitySumTotal(communityID: Int, sumTotal: Double)
>
> val communities: DataSet[Community]
> val weightedEdges: DataSet[WeightedEdge]
>
> val communitiesTable = communities.toTable
> val weightedEdgesTable = weightedEdges.toTable
>
> val sumTotal = communitiesTable.join(weightedEdgesTable)
>  .where("nodeID = src && nodeID = target")
>  .groupBy('communityID)
>  .select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal]
>
>
> but I get this exception:
>
> Exception in thread "main"
> org.apache.flink.api.common.InvalidProgramException: The types of the key
> fields do not match: The number of specified keys is different.
> at
> org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
> at
> org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> at
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> at
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> Moreover when I use the following where clause:
>
> .where("nodeID = src || nodeID = target")
>
> I get another error:
>
> Exception in thread "main"
> org.apache.flink.api.table.ExpressionException: Could not derive
> equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
> 'target.
>
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> at
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> at
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> at
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
>
>
> Apart from that the TableApi seems really promising. It's a really great tool.
>
> Thank you for your help,
>
> Felix
Reply | Threaded
Open this post in threaded view
|

Re: TableAPI - Join on two keys

till.rohrmann
Why not doing two separate joins, union the results and doing a distinct
operation on the combined key?

On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <[hidden email]>
wrote:

> So, the first thing is a "feature" of the Java API that removes
> duplicate fields in keys, so an equi-join on (0,0) with (0,1) would
> throw an error because one 0 is removed from the first key.
>
> The second thing is a feature of the Table API where the error message
> is hinting at the problem:
> Could not derive equi-join predicates for predicate 'nodeID === 'src
> || 'nodeID === 'target
>
> The problem is, that this would have to be executed as a cross
> followed by a filter because none of the predicates are equi-join
> predicates that must always be true (because of the OR relation). This
> I don't want to allow, because a cross can be very expensive. I will
> add a jira ticket for adding a manual cross operation to the Table
> API.
>
> On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <[hidden email]>
> wrote:
> > Hi,
> >
> > I want to join two tables in the following way:
> >
> > case class WeightedEdge(src: Int, target: Int, weight: Double)
> > case class Community(communityID: Int, nodeID: Int)
> >
> > case class CommunitySumTotal(communityID: Int, sumTotal: Double)
> >
> > val communities: DataSet[Community]
> > val weightedEdges: DataSet[WeightedEdge]
> >
> > val communitiesTable = communities.toTable
> > val weightedEdgesTable = weightedEdges.toTable
> >
> > val sumTotal = communitiesTable.join(weightedEdgesTable)
> >  .where("nodeID = src && nodeID = target")
> >  .groupBy('communityID)
> >  .select("communityID, weight.sum as sumTotal").toSet[CommunitySumTotal]
> >
> >
> > but I get this exception:
> >
> > Exception in thread "main"
> > org.apache.flink.api.common.InvalidProgramException: The types of the key
> > fields do not match: The number of specified keys is different.
> > at
> >
> org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
> > at
> >
> org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> > at
> >
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> > at
> >
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> > Moreover when I use the following where clause:
> >
> > .where("nodeID = src || nodeID = target")
> >
> > I get another error:
> >
> > Exception in thread "main"
> > org.apache.flink.api.table.ExpressionException: Could not derive
> > equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
> > 'target.
> >
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> > at
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> > at
> >
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> > at
> >
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> >
> >
> > Apart from that the TableApi seems really promising. It's a really great
> tool.
> >
> > Thank you for your help,
> >
> > Felix
>
Reply | Threaded
Open this post in threaded view
|

Re: TableAPI - Join on two keys

Felix Neutatz
I am also against the manual cross method. Isn't it the idea of the table
API to hide the actual implementation from the user?

Best regards,
Felix
Am 17.04.2015 10:09 vorm. schrieb "Till Rohrmann" <[hidden email]>:

> Why not doing two separate joins, union the results and doing a distinct
> operation on the combined key?
>
> On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <[hidden email]>
> wrote:
>
> > So, the first thing is a "feature" of the Java API that removes
> > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would
> > throw an error because one 0 is removed from the first key.
> >
> > The second thing is a feature of the Table API where the error message
> > is hinting at the problem:
> > Could not derive equi-join predicates for predicate 'nodeID === 'src
> > || 'nodeID === 'target
> >
> > The problem is, that this would have to be executed as a cross
> > followed by a filter because none of the predicates are equi-join
> > predicates that must always be true (because of the OR relation). This
> > I don't want to allow, because a cross can be very expensive. I will
> > add a jira ticket for adding a manual cross operation to the Table
> > API.
> >
> > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <[hidden email]>
> > wrote:
> > > Hi,
> > >
> > > I want to join two tables in the following way:
> > >
> > > case class WeightedEdge(src: Int, target: Int, weight: Double)
> > > case class Community(communityID: Int, nodeID: Int)
> > >
> > > case class CommunitySumTotal(communityID: Int, sumTotal: Double)
> > >
> > > val communities: DataSet[Community]
> > > val weightedEdges: DataSet[WeightedEdge]
> > >
> > > val communitiesTable = communities.toTable
> > > val weightedEdgesTable = weightedEdges.toTable
> > >
> > > val sumTotal = communitiesTable.join(weightedEdgesTable)
> > >  .where("nodeID = src && nodeID = target")
> > >  .groupBy('communityID)
> > >  .select("communityID, weight.sum as
> sumTotal").toSet[CommunitySumTotal]
> > >
> > >
> > > but I get this exception:
> > >
> > > Exception in thread "main"
> > > org.apache.flink.api.common.InvalidProgramException: The types of the
> key
> > > fields do not match: The number of specified keys is different.
> > > at
> > >
> >
> org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
> > > at
> > >
> >
> org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> > > at
> > >
> >
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> > > at
> > >
> >
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> > > Moreover when I use the following where clause:
> > >
> > > .where("nodeID = src || nodeID = target")
> > >
> > > I get another error:
> > >
> > > Exception in thread "main"
> > > org.apache.flink.api.table.ExpressionException: Could not derive
> > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
> > > 'target.
> > >
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> > > at
> > >
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> > > at
> > >
> >
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> > > at
> > >
> >
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> > >
> > >
> > > Apart from that the TableApi seems really promising. It's a really
> great
> > tool.
> > >
> > > Thank you for your help,
> > >
> > > Felix
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: TableAPI - Join on two keys

Aljoscha Krettek-2
Yes, that is the idea, but I think in this case the user must be
protected from an operation that can get ridiculously expensive.

On Fri, Apr 17, 2015 at 10:20 AM, Felix Neutatz <[hidden email]> wrote:

> I am also against the manual cross method. Isn't it the idea of the table
> API to hide the actual implementation from the user?
>
> Best regards,
> Felix
> Am 17.04.2015 10:09 vorm. schrieb "Till Rohrmann" <[hidden email]>:
>
>> Why not doing two separate joins, union the results and doing a distinct
>> operation on the combined key?
>>
>> On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <[hidden email]>
>> wrote:
>>
>> > So, the first thing is a "feature" of the Java API that removes
>> > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would
>> > throw an error because one 0 is removed from the first key.
>> >
>> > The second thing is a feature of the Table API where the error message
>> > is hinting at the problem:
>> > Could not derive equi-join predicates for predicate 'nodeID === 'src
>> > || 'nodeID === 'target
>> >
>> > The problem is, that this would have to be executed as a cross
>> > followed by a filter because none of the predicates are equi-join
>> > predicates that must always be true (because of the OR relation). This
>> > I don't want to allow, because a cross can be very expensive. I will
>> > add a jira ticket for adding a manual cross operation to the Table
>> > API.
>> >
>> > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <[hidden email]>
>> > wrote:
>> > > Hi,
>> > >
>> > > I want to join two tables in the following way:
>> > >
>> > > case class WeightedEdge(src: Int, target: Int, weight: Double)
>> > > case class Community(communityID: Int, nodeID: Int)
>> > >
>> > > case class CommunitySumTotal(communityID: Int, sumTotal: Double)
>> > >
>> > > val communities: DataSet[Community]
>> > > val weightedEdges: DataSet[WeightedEdge]
>> > >
>> > > val communitiesTable = communities.toTable
>> > > val weightedEdgesTable = weightedEdges.toTable
>> > >
>> > > val sumTotal = communitiesTable.join(weightedEdgesTable)
>> > >  .where("nodeID = src && nodeID = target")
>> > >  .groupBy('communityID)
>> > >  .select("communityID, weight.sum as
>> sumTotal").toSet[CommunitySumTotal]
>> > >
>> > >
>> > > but I get this exception:
>> > >
>> > > Exception in thread "main"
>> > > org.apache.flink.api.common.InvalidProgramException: The types of the
>> key
>> > > fields do not match: The number of specified keys is different.
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
>> > > at
>> > >
>> >
>> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
>> > > at
>> > >
>> >
>> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
>> > > Moreover when I use the following where clause:
>> > >
>> > > .where("nodeID = src || nodeID = target")
>> > >
>> > > I get another error:
>> > >
>> > > Exception in thread "main"
>> > > org.apache.flink.api.table.ExpressionException: Could not derive
>> > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
>> > > 'target.
>> > >
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
>> > > at
>> > >
>> >
>> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
>> > > at
>> > >
>> >
>> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
>> > > at
>> > >
>> >
>> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
>> > >
>> > >
>> > > Apart from that the TableApi seems really promising. It's a really
>> great
>> > tool.
>> > >
>> > > Thank you for your help,
>> > >
>> > > Felix
>> >
>>
Reply | Threaded
Open this post in threaded view
|

Re: TableAPI - Join on two keys

Fabian Hueske-2
I agree with Aljoscha.
Let's give a good error message and offer a cross operator.

2015-04-17 4:52 GMT-05:00 Aljoscha Krettek <[hidden email]>:

> Yes, that is the idea, but I think in this case the user must be
> protected from an operation that can get ridiculously expensive.
>
> On Fri, Apr 17, 2015 at 10:20 AM, Felix Neutatz <[hidden email]>
> wrote:
> > I am also against the manual cross method. Isn't it the idea of the table
> > API to hide the actual implementation from the user?
> >
> > Best regards,
> > Felix
> > Am 17.04.2015 10:09 vorm. schrieb "Till Rohrmann" <
> [hidden email]>:
> >
> >> Why not doing two separate joins, union the results and doing a distinct
> >> operation on the combined key?
> >>
> >> On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <[hidden email]>
> >> wrote:
> >>
> >> > So, the first thing is a "feature" of the Java API that removes
> >> > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would
> >> > throw an error because one 0 is removed from the first key.
> >> >
> >> > The second thing is a feature of the Table API where the error message
> >> > is hinting at the problem:
> >> > Could not derive equi-join predicates for predicate 'nodeID === 'src
> >> > || 'nodeID === 'target
> >> >
> >> > The problem is, that this would have to be executed as a cross
> >> > followed by a filter because none of the predicates are equi-join
> >> > predicates that must always be true (because of the OR relation). This
> >> > I don't want to allow, because a cross can be very expensive. I will
> >> > add a jira ticket for adding a manual cross operation to the Table
> >> > API.
> >> >
> >> > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <
> [hidden email]>
> >> > wrote:
> >> > > Hi,
> >> > >
> >> > > I want to join two tables in the following way:
> >> > >
> >> > > case class WeightedEdge(src: Int, target: Int, weight: Double)
> >> > > case class Community(communityID: Int, nodeID: Int)
> >> > >
> >> > > case class CommunitySumTotal(communityID: Int, sumTotal: Double)
> >> > >
> >> > > val communities: DataSet[Community]
> >> > > val weightedEdges: DataSet[WeightedEdge]
> >> > >
> >> > > val communitiesTable = communities.toTable
> >> > > val weightedEdgesTable = weightedEdges.toTable
> >> > >
> >> > > val sumTotal = communitiesTable.join(weightedEdgesTable)
> >> > >  .where("nodeID = src && nodeID = target")
> >> > >  .groupBy('communityID)
> >> > >  .select("communityID, weight.sum as
> >> sumTotal").toSet[CommunitySumTotal]
> >> > >
> >> > >
> >> > > but I get this exception:
> >> > >
> >> > > Exception in thread "main"
> >> > > org.apache.flink.api.common.InvalidProgramException: The types of
> the
> >> key
> >> > > fields do not match: The number of specified keys is different.
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> >> > > Moreover when I use the following where clause:
> >> > >
> >> > > .where("nodeID = src || nodeID = target")
> >> > >
> >> > > I get another error:
> >> > >
> >> > > Exception in thread "main"
> >> > > org.apache.flink.api.table.ExpressionException: Could not derive
> >> > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
> >> > > 'target.
> >> > >
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> >> > > at
> >> > >
> >> >
> >>
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> >> > >
> >> > >
> >> > > Apart from that the TableApi seems really promising. It's a really
> >> great
> >> > tool.
> >> > >
> >> > > Thank you for your help,
> >> > >
> >> > > Felix
> >> >
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: TableAPI - Join on two keys

Stephan Ewen
I also agree with Aljoscha.

It is widely considered a big mistake in SQL that cross products are
implicit rather than explicit (because they are so expensive)

Let's not make the same mistake here to put the theoretical algebra over
the practical user experience and program safety.

On Fri, Apr 17, 2015 at 12:41 PM, Fabian Hueske <[hidden email]> wrote:

> I agree with Aljoscha.
> Let's give a good error message and offer a cross operator.
>
> 2015-04-17 4:52 GMT-05:00 Aljoscha Krettek <[hidden email]>:
>
> > Yes, that is the idea, but I think in this case the user must be
> > protected from an operation that can get ridiculously expensive.
> >
> > On Fri, Apr 17, 2015 at 10:20 AM, Felix Neutatz <[hidden email]>
> > wrote:
> > > I am also against the manual cross method. Isn't it the idea of the
> table
> > > API to hide the actual implementation from the user?
> > >
> > > Best regards,
> > > Felix
> > > Am 17.04.2015 10:09 vorm. schrieb "Till Rohrmann" <
> > [hidden email]>:
> > >
> > >> Why not doing two separate joins, union the results and doing a
> distinct
> > >> operation on the combined key?
> > >>
> > >> On Fri, Apr 17, 2015 at 9:42 AM, Aljoscha Krettek <
> [hidden email]>
> > >> wrote:
> > >>
> > >> > So, the first thing is a "feature" of the Java API that removes
> > >> > duplicate fields in keys, so an equi-join on (0,0) with (0,1) would
> > >> > throw an error because one 0 is removed from the first key.
> > >> >
> > >> > The second thing is a feature of the Table API where the error
> message
> > >> > is hinting at the problem:
> > >> > Could not derive equi-join predicates for predicate 'nodeID === 'src
> > >> > || 'nodeID === 'target
> > >> >
> > >> > The problem is, that this would have to be executed as a cross
> > >> > followed by a filter because none of the predicates are equi-join
> > >> > predicates that must always be true (because of the OR relation).
> This
> > >> > I don't want to allow, because a cross can be very expensive. I will
> > >> > add a jira ticket for adding a manual cross operation to the Table
> > >> > API.
> > >> >
> > >> > On Thu, Apr 16, 2015 at 2:28 PM, Felix Neutatz <
> > [hidden email]>
> > >> > wrote:
> > >> > > Hi,
> > >> > >
> > >> > > I want to join two tables in the following way:
> > >> > >
> > >> > > case class WeightedEdge(src: Int, target: Int, weight: Double)
> > >> > > case class Community(communityID: Int, nodeID: Int)
> > >> > >
> > >> > > case class CommunitySumTotal(communityID: Int, sumTotal: Double)
> > >> > >
> > >> > > val communities: DataSet[Community]
> > >> > > val weightedEdges: DataSet[WeightedEdge]
> > >> > >
> > >> > > val communitiesTable = communities.toTable
> > >> > > val weightedEdgesTable = weightedEdges.toTable
> > >> > >
> > >> > > val sumTotal = communitiesTable.join(weightedEdgesTable)
> > >> > >  .where("nodeID = src && nodeID = target")
> > >> > >  .groupBy('communityID)
> > >> > >  .select("communityID, weight.sum as
> > >> sumTotal").toSet[CommunitySumTotal]
> > >> > >
> > >> > >
> > >> > > but I get this exception:
> > >> > >
> > >> > > Exception in thread "main"
> > >> > > org.apache.flink.api.common.InvalidProgramException: The types of
> > the
> > >> key
> > >> > > fields do not match: The number of specified keys is different.
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.java.operators.JoinOperator.<init>(JoinOperator.java:96)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.java.operators.JoinOperator$EquiJoin.<init>(JoinOperator.java:197)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:310)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> > >> > > Moreover when I use the following where clause:
> > >> > >
> > >> > > .where("nodeID = src || nodeID = target")
> > >> > >
> > >> > > I get another error:
> > >> > >
> > >> > > Exception in thread "main"
> > >> > > org.apache.flink.api.table.ExpressionException: Could not derive
> > >> > > equi-join predicates for predicate 'nodeID === 'src || 'nodeID ===
> > >> > > 'target.
> > >> > >
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.createJoin(JavaBatchTranslator.scala:296)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:145)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:195)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translateInternal(JavaBatchTranslator.scala:183)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.java.table.JavaBatchTranslator.translate(JavaBatchTranslator.scala:78)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.scala.table.ScalaBatchTranslator.translate(ScalaBatchTranslator.scala:55)
> > >> > > at
> > >> > >
> > >> >
> > >>
> >
> org.apache.flink.api.scala.table.TableConversions.toSet(TableConversions.scala:37)
> > >> > >
> > >> > >
> > >> > > Apart from that the TableApi seems really promising. It's a really
> > >> great
> > >> > tool.
> > >> > >
> > >> > > Thank you for your help,
> > >> > >
> > >> > > Felix
> > >> >
> > >>
> >
>