Hello Fabian,
I am streaming my mongodb oplog using flink and want to use flink table API to join multiple tables. My code looks like DataStream<Oplog> streamSource = env .addSource(kafkaConsumer) .setParallelism(4); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // Convert the DataStream into a Table with default fields "f0", "f1" Table table1 = tableEnv.fromDataStream(streamSource); Table master = table1.filter("ns === 'Master'").select("o as master, o.applicationId as primaryKey"); Table child1 = table1.filter("ns === 'Child1'").select("o as child1, o.applicationId as foreignKey"); Table child2 = table1.filter("ns === 'Child2'").select("o as child2, o.applicationId as foreignKey2"); Table result = master.join(child1).where("primaryKey==foreignKey").join(child2).where("primaryKey==foreignKey2"); it is throwing error "Method threw 'org.apache.flink.table.api.ValidationException' exception. Undefined function: APPLICATIONID" public class Oplog implements Serializable{ private BasicDBObject o; } Where o is generic java type for fetching mongodb oplog and I can not replace this generic type with static pojo's. please tell me any work around on this. BasicDBObject suffice following two rules - The class must be public. - It must have a public constructor without arguments (default constructor) and we can access class members through basicDBObject.getString("abc") ----------------------------------------------- *Amol Suryawanshi* Java Developer [hidden email] *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <[hidden email]> ------------------------------------------------ |
Hi Amol,
the dot operation is reserved for calling functions on fields. If you want to get a nested field in the Table API, use the `.get("applicationId")` operation. See also [1] under "Value access functions". Regards, Timo [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#built-in-functions Am 27.07.18 um 09:10 schrieb Amol S - iProgrammer: > Hello Fabian, > > I am streaming my mongodb oplog using flink and want to use flink table API > to join multiple tables. My code looks like > > DataStream<Oplog> streamSource = env > .addSource(kafkaConsumer) > .setParallelism(4); > > StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); > // Convert the DataStream into a Table with default fields "f0", "f1" > Table table1 = tableEnv.fromDataStream(streamSource); > > Table master = table1.filter("ns === 'Master'").select("o as master, > o.applicationId as primaryKey"); > Table child1 = table1.filter("ns === 'Child1'").select("o as child1, > o.applicationId as foreignKey"); > Table child2 = table1.filter("ns === 'Child2'").select("o as child2, > o.applicationId as foreignKey2"); > > Table result = master.join(child1).where("primaryKey==foreignKey").join(child2).where("primaryKey==foreignKey2"); > > it is throwing error "Method threw > 'org.apache.flink.table.api.ValidationException' exception. Undefined > function: APPLICATIONID" > > public class Oplog implements Serializable{ > private BasicDBObject o; > } > > Where o is generic java type for fetching mongodb oplog and I can not > replace this generic type with static pojo's. please tell me any work > around on this. > > BasicDBObject suffice following two rules > > > - > > The class must be public. > - > > It must have a public constructor without arguments (default constructor) > > and we can access class members through basicDBObject.getString("abc") > > > > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > [hidden email] > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <[hidden email]> > ------------------------------------------------ > |
Hello Timo,
Thanks for quick reply. By using your suggestion Previous exception gone but it is giving me following exception Expression 'o.get(_id) failed on input check: Cannot access field of non-composite type 'GenericType<com.mongodb.BasicDBObject>'. ----------------------------------------------- *Amol Suryawanshi* Java Developer [hidden email] *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <[hidden email]> ------------------------------------------------ On Fri, Jul 27, 2018 at 1:08 PM, Timo Walther <[hidden email]> wrote: > Hi Amol, > > the dot operation is reserved for calling functions on fields. If you want > to get a nested field in the Table API, use the `.get("applicationId")` > operation. See also [1] under "Value access functions". > > Regards, > Timo > > [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/ > dev/table/tableApi.html#built-in-functions > > > Am 27.07.18 um 09:10 schrieb Amol S - iProgrammer: > >> Hello Fabian, >> >> I am streaming my mongodb oplog using flink and want to use flink table >> API >> to join multiple tables. My code looks like >> >> DataStream<Oplog> streamSource = env >> .addSource(kafkaConsumer) >> .setParallelism(4); >> >> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir >> onment(env); >> // Convert the DataStream into a Table with default fields "f0", "f1" >> Table table1 = tableEnv.fromDataStream(streamSource); >> >> Table master = table1.filter("ns === 'Master'").select("o as master, >> o.applicationId as primaryKey"); >> Table child1 = table1.filter("ns === 'Child1'").select("o as child1, >> o.applicationId as foreignKey"); >> Table child2 = table1.filter("ns === 'Child2'").select("o as child2, >> o.applicationId as foreignKey2"); >> >> Table result = master.join(child1).where("pri >> maryKey==foreignKey").join(child2).where("primaryKey==foreignKey2"); >> >> it is throwing error "Method threw >> 'org.apache.flink.table.api.ValidationException' exception. Undefined >> function: APPLICATIONID" >> >> public class Oplog implements Serializable{ >> private BasicDBObject o; >> } >> >> Where o is generic java type for fetching mongodb oplog and I can not >> replace this generic type with static pojo's. please tell me any work >> around on this. >> >> BasicDBObject suffice following two rules >> >> >> - >> >> The class must be public. >> - >> >> It must have a public constructor without arguments (default >> constructor) >> >> and we can access class members through basicDBObject.getString("abc") >> >> >> >> >> ----------------------------------------------- >> *Amol Suryawanshi* >> Java Developer >> [hidden email] >> >> >> *iProgrammer Solutions Pvt. Ltd.* >> >> >> >> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, >> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - >> 411016, >> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >> www.iprogrammer.com <[hidden email]> >> ------------------------------------------------ >> >> > |
Hi,
I think the exception is self-explaining. BasicDBObject is not recognized as a POJO by Flink. A POJO is required such that the Table API knows the types of fields for following operations. The easiest way is to implement your own scalar function. E.g. a `accessBasicDBObject(obj, key)`. Regards, Timo Am 27.07.18 um 11:25 schrieb Amol S - iProgrammer: > Hello Timo, > > Thanks for quick reply. By using your suggestion Previous exception gone > but it is giving me following exception > > Expression 'o.get(_id) failed on input check: Cannot access field of > non-composite type 'GenericType<com.mongodb.BasicDBObject>'. > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > [hidden email] > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <[hidden email]> > ------------------------------------------------ > > On Fri, Jul 27, 2018 at 1:08 PM, Timo Walther <[hidden email]> wrote: > >> Hi Amol, >> >> the dot operation is reserved for calling functions on fields. If you want >> to get a nested field in the Table API, use the `.get("applicationId")` >> operation. See also [1] under "Value access functions". >> >> Regards, >> Timo >> >> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/ >> dev/table/tableApi.html#built-in-functions >> >> >> Am 27.07.18 um 09:10 schrieb Amol S - iProgrammer: >> >>> Hello Fabian, >>> >>> I am streaming my mongodb oplog using flink and want to use flink table >>> API >>> to join multiple tables. My code looks like >>> >>> DataStream<Oplog> streamSource = env >>> .addSource(kafkaConsumer) >>> .setParallelism(4); >>> >>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir >>> onment(env); >>> // Convert the DataStream into a Table with default fields "f0", "f1" >>> Table table1 = tableEnv.fromDataStream(streamSource); >>> >>> Table master = table1.filter("ns === 'Master'").select("o as master, >>> o.applicationId as primaryKey"); >>> Table child1 = table1.filter("ns === 'Child1'").select("o as child1, >>> o.applicationId as foreignKey"); >>> Table child2 = table1.filter("ns === 'Child2'").select("o as child2, >>> o.applicationId as foreignKey2"); >>> >>> Table result = master.join(child1).where("pri >>> maryKey==foreignKey").join(child2).where("primaryKey==foreignKey2"); >>> >>> it is throwing error "Method threw >>> 'org.apache.flink.table.api.ValidationException' exception. Undefined >>> function: APPLICATIONID" >>> >>> public class Oplog implements Serializable{ >>> private BasicDBObject o; >>> } >>> >>> Where o is generic java type for fetching mongodb oplog and I can not >>> replace this generic type with static pojo's. please tell me any work >>> around on this. >>> >>> BasicDBObject suffice following two rules >>> >>> >>> - >>> >>> The class must be public. >>> - >>> >>> It must have a public constructor without arguments (default >>> constructor) >>> >>> and we can access class members through basicDBObject.getString("abc") >>> >>> >>> >>> >>> ----------------------------------------------- >>> *Amol Suryawanshi* >>> Java Developer >>> [hidden email] >>> >>> >>> *iProgrammer Solutions Pvt. Ltd.* >>> >>> >>> >>> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, >>> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - >>> 411016, >>> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >>> www.iprogrammer.com <[hidden email]> >>> ------------------------------------------------ >>> >>> |
Hello Timo,
I have implemented my own scalar function as below public class AccessBasicDBObject extends ScalarFunction { public String eval(String key, BasicDBObject basicDBObject) { if (basicDBObject.getString(key) != null) return basicDBObject.getString(key); else return ""; } @Override public TypeInformation<?> getResultType(Class<?>[] signature) { return Types.STRING; } } Table master = table1.filter("ns === 'Master'").select("o as master, 'accessBasicDBObject(applicationId,o)' as primaryKey"); Table child1 = table1.filter("ns === 'Child1'").select("o as child1, 'accessBasicDBObject(applicationId,o)' as foreignKey"); Table child2 = table1.filter("ns === 'Child2'").select("o as child2, 'accessBasicDBObject(applicationId,o)' as foreignKey2"); Table result = master.join(child1).where("primaryKey==foreignKey").join(child2).where("primaryKey==foreignKey2"); and it is giving me following error on line DataStream<Row> rowDataStream = tableEnv.toDataStream(result, Row.class); Exception in thread "Thread-27" org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query: LogicalJoin(condition=[true], joinType=[inner]) LogicalJoin(condition=[true], joinType=[inner]) LogicalProject(master=[$2], primaryKey=[_UTF-16LE'accessBasicDBObject( loanApplicationId,o)']) LogicalFilter(condition=[=($1, _UTF-16LE'analyticDB. customerMISMaster')]) LogicalTableScan(table=[[_DataStreamTable_0]]) LogicalProject(child1=[$2], foreignKey=[_UTF-16LE'accessBasicDBObject( loanApplicationId,o)']) LogicalFilter(condition=[=($1, _UTF-16LE'analyticDB. customerMISChild1')]) LogicalTableScan(table=[[_DataStreamTable_0]]) LogicalProject(child2=[$2], foreignKey2=[_UTF-16LE'accessBasicDBObject( loanApplicationId,o)']) LogicalFilter(condition=[=($1, _UTF-16LE'analyticDB. customerMISChild2')]) LogicalTableScan(table=[[_DataStreamTable_0]]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner( TableEnvironment.scala:274) at org.apache.flink.table.api.StreamTableEnvironment.optimize( StreamTableEnvironment.scala:722) at org.apache.flink.table.api.StreamTableEnvironment.translate( StreamTableEnvironment.scala:778) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream( StreamTableEnvironment.scala:308) at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream( StreamTableEnvironment.scala:262) at org.apache.flink.table.api.java.StreamTableEnvironment.toDataStream( StreamTableEnvironment.scala:159) at com.softcell.streaming.flink.StreamingOperations$2.run( StreamingOperations.java:168) ----------------------------------------------- *Amol Suryawanshi* Java Developer [hidden email] *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <[hidden email]> ------------------------------------------------ On Fri, Jul 27, 2018 at 3:13 PM, Timo Walther <[hidden email]> wrote: > Hi, > > I think the exception is self-explaining. BasicDBObject is not recognized > as a POJO by Flink. A POJO is required such that the Table API knows the > types of fields for following operations. > > The easiest way is to implement your own scalar function. E.g. a > `accessBasicDBObject(obj, key)`. > > Regards, > Timo > > > Am 27.07.18 um 11:25 schrieb Amol S - iProgrammer: > >> Hello Timo, >> >> Thanks for quick reply. By using your suggestion Previous exception gone >> but it is giving me following exception >> >> Expression 'o.get(_id) failed on input check: Cannot access field of >> non-composite type 'GenericType<com.mongodb.BasicDBObject>'. >> >> ----------------------------------------------- >> *Amol Suryawanshi* >> Java Developer >> [hidden email] >> >> >> *iProgrammer Solutions Pvt. Ltd.* >> >> >> >> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, >> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - >> 411016, >> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >> www.iprogrammer.com <[hidden email]> >> ------------------------------------------------ >> >> On Fri, Jul 27, 2018 at 1:08 PM, Timo Walther <[hidden email]> wrote: >> >> Hi Amol, >>> >>> the dot operation is reserved for calling functions on fields. If you >>> want >>> to get a nested field in the Table API, use the `.get("applicationId")` >>> operation. See also [1] under "Value access functions". >>> >>> Regards, >>> Timo >>> >>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/ >>> dev/table/tableApi.html#built-in-functions >>> >>> >>> Am 27.07.18 um 09:10 schrieb Amol S - iProgrammer: >>> >>> Hello Fabian, >>>> >>>> I am streaming my mongodb oplog using flink and want to use flink table >>>> API >>>> to join multiple tables. My code looks like >>>> >>>> DataStream<Oplog> streamSource = env >>>> .addSource(kafkaConsumer) >>>> .setParallelism(4); >>>> >>>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir >>>> onment(env); >>>> // Convert the DataStream into a Table with default fields "f0", "f1" >>>> Table table1 = tableEnv.fromDataStream(streamSource); >>>> >>>> Table master = table1.filter("ns === 'Master'").select("o as master, >>>> o.applicationId as primaryKey"); >>>> Table child1 = table1.filter("ns === 'Child1'").select("o as child1, >>>> o.applicationId as foreignKey"); >>>> Table child2 = table1.filter("ns === 'Child2'").select("o as child2, >>>> o.applicationId as foreignKey2"); >>>> >>>> Table result = master.join(child1).where("pri >>>> maryKey==foreignKey").join(child2).where("primaryKey==foreignKey2"); >>>> >>>> it is throwing error "Method threw >>>> 'org.apache.flink.table.api.ValidationException' exception. Undefined >>>> function: APPLICATIONID" >>>> >>>> public class Oplog implements Serializable{ >>>> private BasicDBObject o; >>>> } >>>> >>>> Where o is generic java type for fetching mongodb oplog and I can not >>>> replace this generic type with static pojo's. please tell me any work >>>> around on this. >>>> >>>> BasicDBObject suffice following two rules >>>> >>>> >>>> - >>>> >>>> The class must be public. >>>> - >>>> >>>> It must have a public constructor without arguments (default >>>> constructor) >>>> >>>> and we can access class members through basicDBObject.getString("abc") >>>> >>>> >>>> >>>> >>>> ----------------------------------------------- >>>> *Amol Suryawanshi* >>>> Java Developer >>>> [hidden email] >>>> >>>> >>>> *iProgrammer Solutions Pvt. Ltd.* >>>> >>>> >>>> >>>> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, >>>> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - >>>> 411016, >>>> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >>>> www.iprogrammer.com <[hidden email]> >>>> ------------------------------------------------ >>>> >>>> >>>> > |
I tried to reproduce your error but everything worked fine. Which Flink
version are you using? Inner joins are a Flink 1.5 feature. Am 27.07.18 um 13:28 schrieb Amol S - iProgrammer: > Table master = table1.filter("ns === 'Master'").select("o as master, > 'accessBasicDBObject(applicationId,o)' as primaryKey"); > Table child1 = table1.filter("ns === 'Child1'").select("o as child1, > 'accessBasicDBObject(applicationId,o)' as foreignKey"); > Table child2 = table1.filter("ns === 'Child2'").select("o as child2, > 'accessBasicDBObject(applicationId,o)' as foreignKey2"); > > Table result = master.join(child1).where("primaryKey==foreignKey").join(child2).where("primaryKey==foreignKey2"); |
Thanks Timo,
custom function worked for me with no further exceptions, Thanks. ----------------------------------------------- *Amol Suryawanshi* Java Developer [hidden email] *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <[hidden email]> ------------------------------------------------ On Fri, Jul 27, 2018 at 6:10 PM, Timo Walther <[hidden email]> wrote: > I tried to reproduce your error but everything worked fine. Which Flink > version are you using? > > Inner joins are a Flink 1.5 feature. > > > Am 27.07.18 um 13:28 schrieb Amol S - iProgrammer: > > Table master = table1.filter("ns === 'Master'").select("o as master, >> 'accessBasicDBObject(applicationId,o)' as primaryKey"); >> Table child1 = table1.filter("ns === 'Child1'").select("o as child1, >> 'accessBasicDBObject(applicationId,o)' as foreignKey"); >> Table child2 = table1.filter("ns === 'Child2'").select("o as child2, >> 'accessBasicDBObject(applicationId,o)' as foreignKey2"); >> >> Table result = master.join(child1).where("pri >> maryKey==foreignKey").join(child2).where("primaryKey==foreignKey2"); >> > > > |
Free forum by Nabble | Edit this page |