Access generic pojo fields

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Access generic pojo fields

Amol S - iProgrammer
Hello Fabian,

I am streaming my mongodb oplog using flink and want to use flink table API
to join multiple tables.  My code looks like

DataStream<Oplog> streamSource = env
        .addSource(kafkaConsumer)
        .setParallelism(4);

StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
// Convert the DataStream into a Table with default fields "f0", "f1"
Table table1 = tableEnv.fromDataStream(streamSource);

Table master = table1.filter("ns === 'Master'").select("o as master,
o.applicationId as primaryKey");
Table child1 = table1.filter("ns === 'Child1'").select("o  as child1,
o.applicationId as foreignKey");
Table child2 = table1.filter("ns === 'Child2'").select("o  as child2,
o.applicationId as foreignKey2");

Table result = master.join(child1).where("primaryKey==foreignKey").join(child2).where("primaryKey==foreignKey2");

it is throwing error "Method threw
'org.apache.flink.table.api.ValidationException' exception. Undefined
function: APPLICATIONID"

public class Oplog implements Serializable{
    private BasicDBObject o;
}

Where o is generic java type for fetching mongodb oplog and I can not
replace this generic type with static pojo's. please tell me any work
around on this.

BasicDBObject suffice following two rules


   -

   The class must be public.
   -

   It must have a public constructor without arguments (default constructor)

and we can access class members through basicDBObject.getString("abc")




-----------------------------------------------
*Amol Suryawanshi*
Java Developer
[hidden email]


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <[hidden email]>
------------------------------------------------
Reply | Threaded
Open this post in threaded view
|

Re: Access generic pojo fields

Timo Walther-2
Hi Amol,

the dot operation is reserved for calling functions on fields. If you
want to get a nested field in the Table API, use the
`.get("applicationId")` operation. See also [1] under "Value access
functions".

Regards,
Timo

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#built-in-functions


Am 27.07.18 um 09:10 schrieb Amol S - iProgrammer:

> Hello Fabian,
>
> I am streaming my mongodb oplog using flink and want to use flink table API
> to join multiple tables.  My code looks like
>
> DataStream<Oplog> streamSource = env
>          .addSource(kafkaConsumer)
>          .setParallelism(4);
>
> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
> // Convert the DataStream into a Table with default fields "f0", "f1"
> Table table1 = tableEnv.fromDataStream(streamSource);
>
> Table master = table1.filter("ns === 'Master'").select("o as master,
> o.applicationId as primaryKey");
> Table child1 = table1.filter("ns === 'Child1'").select("o  as child1,
> o.applicationId as foreignKey");
> Table child2 = table1.filter("ns === 'Child2'").select("o  as child2,
> o.applicationId as foreignKey2");
>
> Table result = master.join(child1).where("primaryKey==foreignKey").join(child2).where("primaryKey==foreignKey2");
>
> it is throwing error "Method threw
> 'org.apache.flink.table.api.ValidationException' exception. Undefined
> function: APPLICATIONID"
>
> public class Oplog implements Serializable{
>      private BasicDBObject o;
> }
>
> Where o is generic java type for fetching mongodb oplog and I can not
> replace this generic type with static pojo's. please tell me any work
> around on this.
>
> BasicDBObject suffice following two rules
>
>
>     -
>
>     The class must be public.
>     -
>
>     It must have a public constructor without arguments (default constructor)
>
> and we can access class members through basicDBObject.getString("abc")
>
>
>
>
> -----------------------------------------------
> *Amol Suryawanshi*
> Java Developer
> [hidden email]
>
>
> *iProgrammer Solutions Pvt. Ltd.*
>
>
>
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com <[hidden email]>
> ------------------------------------------------
>

Reply | Threaded
Open this post in threaded view
|

Re: Access generic pojo fields

Amol S - iProgrammer
Hello Timo,

Thanks for quick reply. By using your suggestion Previous exception gone
but it is giving me following exception

Expression 'o.get(_id) failed on input check: Cannot access field of
non-composite type 'GenericType<com.mongodb.BasicDBObject>'.

-----------------------------------------------
*Amol Suryawanshi*
Java Developer
[hidden email]


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <[hidden email]>
------------------------------------------------

On Fri, Jul 27, 2018 at 1:08 PM, Timo Walther <[hidden email]> wrote:

> Hi Amol,
>
> the dot operation is reserved for calling functions on fields. If you want
> to get a nested field in the Table API, use the `.get("applicationId")`
> operation. See also [1] under "Value access functions".
>
> Regards,
> Timo
>
> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/
> dev/table/tableApi.html#built-in-functions
>
>
> Am 27.07.18 um 09:10 schrieb Amol S - iProgrammer:
>
>> Hello Fabian,
>>
>> I am streaming my mongodb oplog using flink and want to use flink table
>> API
>> to join multiple tables.  My code looks like
>>
>> DataStream<Oplog> streamSource = env
>>          .addSource(kafkaConsumer)
>>          .setParallelism(4);
>>
>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir
>> onment(env);
>> // Convert the DataStream into a Table with default fields "f0", "f1"
>> Table table1 = tableEnv.fromDataStream(streamSource);
>>
>> Table master = table1.filter("ns === 'Master'").select("o as master,
>> o.applicationId as primaryKey");
>> Table child1 = table1.filter("ns === 'Child1'").select("o  as child1,
>> o.applicationId as foreignKey");
>> Table child2 = table1.filter("ns === 'Child2'").select("o  as child2,
>> o.applicationId as foreignKey2");
>>
>> Table result = master.join(child1).where("pri
>> maryKey==foreignKey").join(child2).where("primaryKey==foreignKey2");
>>
>> it is throwing error "Method threw
>> 'org.apache.flink.table.api.ValidationException' exception. Undefined
>> function: APPLICATIONID"
>>
>> public class Oplog implements Serializable{
>>      private BasicDBObject o;
>> }
>>
>> Where o is generic java type for fetching mongodb oplog and I can not
>> replace this generic type with static pojo's. please tell me any work
>> around on this.
>>
>> BasicDBObject suffice following two rules
>>
>>
>>     -
>>
>>     The class must be public.
>>     -
>>
>>     It must have a public constructor without arguments (default
>> constructor)
>>
>> and we can access class members through basicDBObject.getString("abc")
>>
>>
>>
>>
>> -----------------------------------------------
>> *Amol Suryawanshi*
>> Java Developer
>> [hidden email]
>>
>>
>> *iProgrammer Solutions Pvt. Ltd.*
>>
>>
>>
>> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
>> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
>> 411016,
>> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
>> www.iprogrammer.com <[hidden email]>
>> ------------------------------------------------
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Access generic pojo fields

Timo Walther-2
Hi,

I think the exception is self-explaining. BasicDBObject is not
recognized as a POJO by Flink. A POJO is required such that the Table
API knows the types of fields for following operations.

The easiest way is to implement your own scalar function. E.g. a
`accessBasicDBObject(obj, key)`.

Regards,
Timo


Am 27.07.18 um 11:25 schrieb Amol S - iProgrammer:

> Hello Timo,
>
> Thanks for quick reply. By using your suggestion Previous exception gone
> but it is giving me following exception
>
> Expression 'o.get(_id) failed on input check: Cannot access field of
> non-composite type 'GenericType<com.mongodb.BasicDBObject>'.
>
> -----------------------------------------------
> *Amol Suryawanshi*
> Java Developer
> [hidden email]
>
>
> *iProgrammer Solutions Pvt. Ltd.*
>
>
>
> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
> www.iprogrammer.com <[hidden email]>
> ------------------------------------------------
>
> On Fri, Jul 27, 2018 at 1:08 PM, Timo Walther <[hidden email]> wrote:
>
>> Hi Amol,
>>
>> the dot operation is reserved for calling functions on fields. If you want
>> to get a nested field in the Table API, use the `.get("applicationId")`
>> operation. See also [1] under "Value access functions".
>>
>> Regards,
>> Timo
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/
>> dev/table/tableApi.html#built-in-functions
>>
>>
>> Am 27.07.18 um 09:10 schrieb Amol S - iProgrammer:
>>
>>> Hello Fabian,
>>>
>>> I am streaming my mongodb oplog using flink and want to use flink table
>>> API
>>> to join multiple tables.  My code looks like
>>>
>>> DataStream<Oplog> streamSource = env
>>>           .addSource(kafkaConsumer)
>>>           .setParallelism(4);
>>>
>>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir
>>> onment(env);
>>> // Convert the DataStream into a Table with default fields "f0", "f1"
>>> Table table1 = tableEnv.fromDataStream(streamSource);
>>>
>>> Table master = table1.filter("ns === 'Master'").select("o as master,
>>> o.applicationId as primaryKey");
>>> Table child1 = table1.filter("ns === 'Child1'").select("o  as child1,
>>> o.applicationId as foreignKey");
>>> Table child2 = table1.filter("ns === 'Child2'").select("o  as child2,
>>> o.applicationId as foreignKey2");
>>>
>>> Table result = master.join(child1).where("pri
>>> maryKey==foreignKey").join(child2).where("primaryKey==foreignKey2");
>>>
>>> it is throwing error "Method threw
>>> 'org.apache.flink.table.api.ValidationException' exception. Undefined
>>> function: APPLICATIONID"
>>>
>>> public class Oplog implements Serializable{
>>>       private BasicDBObject o;
>>> }
>>>
>>> Where o is generic java type for fetching mongodb oplog and I can not
>>> replace this generic type with static pojo's. please tell me any work
>>> around on this.
>>>
>>> BasicDBObject suffice following two rules
>>>
>>>
>>>      -
>>>
>>>      The class must be public.
>>>      -
>>>
>>>      It must have a public constructor without arguments (default
>>> constructor)
>>>
>>> and we can access class members through basicDBObject.getString("abc")
>>>
>>>
>>>
>>>
>>> -----------------------------------------------
>>> *Amol Suryawanshi*
>>> Java Developer
>>> [hidden email]
>>>
>>>
>>> *iProgrammer Solutions Pvt. Ltd.*
>>>
>>>
>>>
>>> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
>>> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
>>> 411016,
>>> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
>>> www.iprogrammer.com <[hidden email]>
>>> ------------------------------------------------
>>>
>>>

Reply | Threaded
Open this post in threaded view
|

Re: Access generic pojo fields

Amol S - iProgrammer
Hello Timo,

I have implemented my own scalar function as below

public class AccessBasicDBObject extends ScalarFunction {

    public String eval(String key, BasicDBObject basicDBObject) {
        if (basicDBObject.getString(key) != null)
            return basicDBObject.getString(key);
        else return "";
    }

    @Override
    public TypeInformation<?> getResultType(Class<?>[] signature) {
        return Types.STRING;
    }
}

Table master = table1.filter("ns === 'Master'").select("o as master,
'accessBasicDBObject(applicationId,o)' as primaryKey");
Table child1 = table1.filter("ns === 'Child1'").select("o  as child1,
'accessBasicDBObject(applicationId,o)' as foreignKey");
Table child2 = table1.filter("ns === 'Child2'").select("o  as child2,
'accessBasicDBObject(applicationId,o)' as foreignKey2");

Table result = master.join(child1).where("primaryKey==foreignKey").join(child2).where("primaryKey==foreignKey2");

and it is giving me following error on line DataStream<Row> rowDataStream =
tableEnv.toDataStream(result, Row.class);

Exception in thread "Thread-27" org.apache.flink.table.api.TableException:
Cannot generate a valid execution plan for the given query:

LogicalJoin(condition=[true], joinType=[inner])
  LogicalJoin(condition=[true], joinType=[inner])
    LogicalProject(master=[$2], primaryKey=[_UTF-16LE'accessBasicDBObject(
loanApplicationId,o)'])
      LogicalFilter(condition=[=($1, _UTF-16LE'analyticDB.
customerMISMaster')])
        LogicalTableScan(table=[[_DataStreamTable_0]])
    LogicalProject(child1=[$2], foreignKey=[_UTF-16LE'accessBasicDBObject(
loanApplicationId,o)'])
      LogicalFilter(condition=[=($1, _UTF-16LE'analyticDB.
customerMISChild1')])
        LogicalTableScan(table=[[_DataStreamTable_0]])
  LogicalProject(child2=[$2], foreignKey2=[_UTF-16LE'accessBasicDBObject(
loanApplicationId,o)'])
    LogicalFilter(condition=[=($1, _UTF-16LE'analyticDB.
customerMISChild2')])
      LogicalTableScan(table=[[_DataStreamTable_0]])

This exception indicates that the query uses an unsupported SQL feature.
Please check the documentation for the set of currently supported SQL
features.
at org.apache.flink.table.api.TableEnvironment.runVolcanoPlanner(
TableEnvironment.scala:274)
at org.apache.flink.table.api.StreamTableEnvironment.optimize(
StreamTableEnvironment.scala:722)
at org.apache.flink.table.api.StreamTableEnvironment.translate(
StreamTableEnvironment.scala:778)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(
StreamTableEnvironment.scala:308)
at org.apache.flink.table.api.java.StreamTableEnvironment.toAppendStream(
StreamTableEnvironment.scala:262)
at org.apache.flink.table.api.java.StreamTableEnvironment.toDataStream(
StreamTableEnvironment.scala:159)
at com.softcell.streaming.flink.StreamingOperations$2.run(
StreamingOperations.java:168)

-----------------------------------------------
*Amol Suryawanshi*
Java Developer
[hidden email]


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <[hidden email]>
------------------------------------------------

On Fri, Jul 27, 2018 at 3:13 PM, Timo Walther <[hidden email]> wrote:

> Hi,
>
> I think the exception is self-explaining. BasicDBObject is not recognized
> as a POJO by Flink. A POJO is required such that the Table API knows the
> types of fields for following operations.
>
> The easiest way is to implement your own scalar function. E.g. a
> `accessBasicDBObject(obj, key)`.
>
> Regards,
> Timo
>
>
> Am 27.07.18 um 11:25 schrieb Amol S - iProgrammer:
>
>> Hello Timo,
>>
>> Thanks for quick reply. By using your suggestion Previous exception gone
>> but it is giving me following exception
>>
>> Expression 'o.get(_id) failed on input check: Cannot access field of
>> non-composite type 'GenericType<com.mongodb.BasicDBObject>'.
>>
>> -----------------------------------------------
>> *Amol Suryawanshi*
>> Java Developer
>> [hidden email]
>>
>>
>> *iProgrammer Solutions Pvt. Ltd.*
>>
>>
>>
>> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
>> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
>> 411016,
>> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
>> www.iprogrammer.com <[hidden email]>
>> ------------------------------------------------
>>
>> On Fri, Jul 27, 2018 at 1:08 PM, Timo Walther <[hidden email]> wrote:
>>
>> Hi Amol,
>>>
>>> the dot operation is reserved for calling functions on fields. If you
>>> want
>>> to get a nested field in the Table API, use the `.get("applicationId")`
>>> operation. See also [1] under "Value access functions".
>>>
>>> Regards,
>>> Timo
>>>
>>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/
>>> dev/table/tableApi.html#built-in-functions
>>>
>>>
>>> Am 27.07.18 um 09:10 schrieb Amol S - iProgrammer:
>>>
>>> Hello Fabian,
>>>>
>>>> I am streaming my mongodb oplog using flink and want to use flink table
>>>> API
>>>> to join multiple tables.  My code looks like
>>>>
>>>> DataStream<Oplog> streamSource = env
>>>>           .addSource(kafkaConsumer)
>>>>           .setParallelism(4);
>>>>
>>>> StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvir
>>>> onment(env);
>>>> // Convert the DataStream into a Table with default fields "f0", "f1"
>>>> Table table1 = tableEnv.fromDataStream(streamSource);
>>>>
>>>> Table master = table1.filter("ns === 'Master'").select("o as master,
>>>> o.applicationId as primaryKey");
>>>> Table child1 = table1.filter("ns === 'Child1'").select("o  as child1,
>>>> o.applicationId as foreignKey");
>>>> Table child2 = table1.filter("ns === 'Child2'").select("o  as child2,
>>>> o.applicationId as foreignKey2");
>>>>
>>>> Table result = master.join(child1).where("pri
>>>> maryKey==foreignKey").join(child2).where("primaryKey==foreignKey2");
>>>>
>>>> it is throwing error "Method threw
>>>> 'org.apache.flink.table.api.ValidationException' exception. Undefined
>>>> function: APPLICATIONID"
>>>>
>>>> public class Oplog implements Serializable{
>>>>       private BasicDBObject o;
>>>> }
>>>>
>>>> Where o is generic java type for fetching mongodb oplog and I can not
>>>> replace this generic type with static pojo's. please tell me any work
>>>> around on this.
>>>>
>>>> BasicDBObject suffice following two rules
>>>>
>>>>
>>>>      -
>>>>
>>>>      The class must be public.
>>>>      -
>>>>
>>>>      It must have a public constructor without arguments (default
>>>> constructor)
>>>>
>>>> and we can access class members through basicDBObject.getString("abc")
>>>>
>>>>
>>>>
>>>>
>>>> -----------------------------------------------
>>>> *Amol Suryawanshi*
>>>> Java Developer
>>>> [hidden email]
>>>>
>>>>
>>>> *iProgrammer Solutions Pvt. Ltd.*
>>>>
>>>>
>>>>
>>>> *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
>>>> Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune -
>>>> 411016,
>>>> MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
>>>> www.iprogrammer.com <[hidden email]>
>>>> ------------------------------------------------
>>>>
>>>>
>>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Access generic pojo fields

Timo Walther-2
I tried to reproduce your error but everything worked fine. Which Flink
version are you using?

Inner joins are a Flink 1.5 feature.


Am 27.07.18 um 13:28 schrieb Amol S - iProgrammer:
> Table master = table1.filter("ns === 'Master'").select("o as master,
> 'accessBasicDBObject(applicationId,o)' as primaryKey");
> Table child1 = table1.filter("ns === 'Child1'").select("o  as child1,
> 'accessBasicDBObject(applicationId,o)' as foreignKey");
> Table child2 = table1.filter("ns === 'Child2'").select("o  as child2,
> 'accessBasicDBObject(applicationId,o)' as foreignKey2");
>
> Table result = master.join(child1).where("primaryKey==foreignKey").join(child2).where("primaryKey==foreignKey2");


Reply | Threaded
Open this post in threaded view
|

Re: Access generic pojo fields

Amol S - iProgrammer
Thanks Timo,

custom function worked for me with no further exceptions,

Thanks.



-----------------------------------------------
*Amol Suryawanshi*
Java Developer
[hidden email]


*iProgrammer Solutions Pvt. Ltd.*



*Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society,
Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016,
MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer*
www.iprogrammer.com <[hidden email]>
------------------------------------------------

On Fri, Jul 27, 2018 at 6:10 PM, Timo Walther <[hidden email]> wrote:

> I tried to reproduce your error but everything worked fine. Which Flink
> version are you using?
>
> Inner joins are a Flink 1.5 feature.
>
>
> Am 27.07.18 um 13:28 schrieb Amol S - iProgrammer:
>
> Table master = table1.filter("ns === 'Master'").select("o as master,
>> 'accessBasicDBObject(applicationId,o)' as primaryKey");
>> Table child1 = table1.filter("ns === 'Child1'").select("o  as child1,
>> 'accessBasicDBObject(applicationId,o)' as foreignKey");
>> Table child2 = table1.filter("ns === 'Child2'").select("o  as child2,
>> 'accessBasicDBObject(applicationId,o)' as foreignKey2");
>>
>> Table result = master.join(child1).where("pri
>> maryKey==foreignKey").join(child2).where("primaryKey==foreignKey2");
>>
>
>
>