Hello,
I am streaming mongodb oplog using kafka and flink and want to join multiple tables using flink table api but i have some concerns like is it possible to join streamed tables in flink and if yes then please provide me some example of stream join using table API. I gone through your dynamic table api doc. it is quit interesting but haven't found any example tutorial how to implement dynamic table. I have tried to implement table api join using pojo class but it is giving org.apache.flink.table.api.TableException: Cannot generate a valid execution plan for the given query ----------------------------------------------- *Amol Suryawanshi* Java Developer [hidden email] *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <[hidden email]> ------------------------------------------------ |
Hi Amol,
The “dynamic table” is just a logical concept, following which the Flink table API is designed. That means you don’t need to implement dynamic tables yourself. Flink table API provides different kinds of stream to stream joins in recent versions (from 1.4). The related docs can be found here https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#joins <https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/tableApi.html#joins>. Best, Xingcan > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer <[hidden email]> wrote: > > Hello, > > I am streaming mongodb oplog using kafka and flink and want to join > multiple tables using flink table api but i have some concerns like is it > possible to join streamed tables in flink and if yes then please provide me > some example of stream join using table API. > > I gone through your dynamic table api doc. it is quit interesting but > haven't found any example tutorial how to implement dynamic table. > > I have tried to implement table api join using pojo class but it is > giving org.apache.flink.table.api.TableException: Cannot generate a valid > execution plan for the given query > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > [hidden email] > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <[hidden email]> > ------------------------------------------------ |
Hello Xingcan
DataStream<Oplog> streamSource = env .addSource(kafkaConsumer) .setParallelism(4); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // Convert the DataStream into a Table with default fields "f0", "f1" Table table1 = tableEnv.fromDataStream(streamSource); Table customerMISMaster = table1.filter("ns === 'local.customerMISMaster'"). select("o as master"); Table customerMISChild1 = table1.filter("ns === 'local.customerMISChild1'"). select("o as child1"); Table customerMISChild2 = table1.filter("ns === 'local.customerMISChild2'"). select("o as child2"); Table result = customerMISMaster.join(customerMISChild1).where(" master.loanApplicationId=child1.loanApplicationId"); it is throwing error "Method threw 'org.apache.flink.table.api.ValidationException' exception. Undefined function: LOANAPPLICATIONID" ----------------------------------------------- *Amol Suryawanshi* Java Developer [hidden email] *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <[hidden email]> ------------------------------------------------ On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <[hidden email]> wrote: > Hi Amol, > > The “dynamic table” is just a logical concept, following which the Flink > table API is designed. > That means you don’t need to implement dynamic tables yourself. > > Flink table API provides different kinds of stream to stream joins in > recent versions (from 1.4). > The related docs can be found here https://ci.apache.org/projects > /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < > https://ci.apache.org/projects/flink/flink-docs-release-1. > 5/dev/table/tableApi.html#joins>. > > Best, > Xingcan > > > > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer <[hidden email]> > wrote: > > > > Hello, > > > > I am streaming mongodb oplog using kafka and flink and want to join > > multiple tables using flink table api but i have some concerns like is it > > possible to join streamed tables in flink and if yes then please provide > me > > some example of stream join using table API. > > > > I gone through your dynamic table api doc. it is quit interesting but > > haven't found any example tutorial how to implement dynamic table. > > > > I have tried to implement table api join using pojo class but it is > > giving org.apache.flink.table.api.TableException: Cannot generate a > valid > > execution plan for the given query > > > > ----------------------------------------------- > > *Amol Suryawanshi* > > Java Developer > > [hidden email] > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > 411016, > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > www.iprogrammer.com <[hidden email]> > > ------------------------------------------------ > > |
Hi,
It looks like the type of master is not known to Flink. What's the output of customerMISMaster.printSchema(); ? Best, Fabian 2018-07-02 11:33 GMT+02:00 Amol S - iProgrammer <[hidden email]>: > Hello Xingcan > > DataStream<Oplog> streamSource = env > .addSource(kafkaConsumer) > .setParallelism(4); > > StreamTableEnvironment tableEnv = TableEnvironment. > getTableEnvironment(env); > // Convert the DataStream into a Table with default fields "f0", "f1" > Table table1 = tableEnv.fromDataStream(streamSource); > > > Table customerMISMaster = table1.filter("ns === > 'local.customerMISMaster'"). > select("o as master"); > Table customerMISChild1 = table1.filter("ns === > 'local.customerMISChild1'"). > select("o as child1"); > Table customerMISChild2 = table1.filter("ns === > 'local.customerMISChild2'"). > select("o as child2"); > Table result = customerMISMaster.join(customerMISChild1).where(" > master.loanApplicationId=child1.loanApplicationId"); > > > it is throwing error "Method threw > 'org.apache.flink.table.api.ValidationException' exception. Undefined > function: LOANAPPLICATIONID" > > > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > [hidden email] > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <[hidden email]> > ------------------------------------------------ > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <[hidden email]> wrote: > > > Hi Amol, > > > > The “dynamic table” is just a logical concept, following which the Flink > > table API is designed. > > That means you don’t need to implement dynamic tables yourself. > > > > Flink table API provides different kinds of stream to stream joins in > > recent versions (from 1.4). > > The related docs can be found here https://ci.apache.org/projects > > /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < > > https://ci.apache.org/projects/flink/flink-docs-release-1. > > 5/dev/table/tableApi.html#joins>. > > > > Best, > > Xingcan > > > > > > > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer < > [hidden email]> > > wrote: > > > > > > Hello, > > > > > > I am streaming mongodb oplog using kafka and flink and want to join > > > multiple tables using flink table api but i have some concerns like is > it > > > possible to join streamed tables in flink and if yes then please > provide > > me > > > some example of stream join using table API. > > > > > > I gone through your dynamic table api doc. it is quit interesting but > > > haven't found any example tutorial how to implement dynamic table. > > > > > > I have tried to implement table api join using pojo class but it is > > > giving org.apache.flink.table.api.TableException: Cannot generate a > > valid > > > execution plan for the given query > > > > > > ----------------------------------------------- > > > *Amol Suryawanshi* > > > Java Developer > > > [hidden email] > > > > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > > 411016, > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > > www.iprogrammer.com <[hidden email]> > > > ------------------------------------------------ > > > > > |
In reply to this post by Amol S - iProgrammer
Hello Xingcan
As mentioned in above mail thread I am streaming mongodb oplog to join multiple mongo tables based on some unique key (Primary key). To achieve this I have created one java pojo as below. where o represent generic pojo type of mongodb which has my table fields i.e. dynamic. now I want to use table api join over this basic BasicDBObject but it seem flink does not allow generic pojo's. please suggest on this. public class Oplog { private OplogTimestamp ts; private BasicDBObject o; } ----------------------------------------------- *Amol Suryawanshi* Java Developer [hidden email] *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <[hidden email]> ------------------------------------------------ On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer <[hidden email]> wrote: > Hello Xingcan > > DataStream<Oplog> streamSource = env > .addSource(kafkaConsumer) > .setParallelism(4); > > StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); > // Convert the DataStream into a Table with default fields "f0", "f1" > Table table1 = tableEnv.fromDataStream(streamSource); > > > Table customerMISMaster = table1.filter("ns === 'local.customerMISMaster'" > ).select("o as master"); > Table customerMISChild1 = table1.filter("ns === 'local.customerMISChild1'" > ).select("o as child1"); > Table customerMISChild2 = table1.filter("ns === 'local.customerMISChild2'" > ).select("o as child2"); > Table result = customerMISMaster.join(customerMISChild1).where("master. > loanApplicationId=child1.loanApplicationId"); > > > it is throwing error "Method threw 'org.apache.flink.table.api.ValidationException' exception. Undefined function: LOANAPPLICATIONID" > > > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > [hidden email] > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <[hidden email]> > ------------------------------------------------ > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <[hidden email]> wrote: > >> Hi Amol, >> >> The “dynamic table” is just a logical concept, following which the Flink >> table API is designed. >> That means you don’t need to implement dynamic tables yourself. >> >> Flink table API provides different kinds of stream to stream joins in >> recent versions (from 1.4). >> The related docs can be found here https://ci.apache.org/projects >> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < >> https://ci.apache.org/projects/flink/flink-docs-release-1.5 >> /dev/table/tableApi.html#joins>. >> >> Best, >> Xingcan >> >> >> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer <[hidden email]> >> wrote: >> > >> > Hello, >> > >> > I am streaming mongodb oplog using kafka and flink and want to join >> > multiple tables using flink table api but i have some concerns like is >> it >> > possible to join streamed tables in flink and if yes then please >> provide me >> > some example of stream join using table API. >> > >> > I gone through your dynamic table api doc. it is quit interesting but >> > haven't found any example tutorial how to implement dynamic table. >> > >> > I have tried to implement table api join using pojo class but it is >> > giving org.apache.flink.table.api.TableException: Cannot generate a >> valid >> > execution plan for the given query >> > >> > ----------------------------------------------- >> > *Amol Suryawanshi* >> > Java Developer >> > [hidden email] >> > >> > >> > *iProgrammer Solutions Pvt. Ltd.* >> > >> > >> > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - >> 411016, >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >> > www.iprogrammer.com <[hidden email]> >> > ------------------------------------------------ >> >> > |
In reply to this post by Fabian Hueske-2
Hello Fabian,
The output of customerMISMaster.printSchema() is undefined ----------------------------------------------- *Amol Suryawanshi* Java Developer [hidden email] *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <[hidden email]> ------------------------------------------------ On Mon, Jul 2, 2018 at 3:49 PM, Fabian Hueske <[hidden email]> wrote: > Hi, > > It looks like the type of master is not known to Flink. > What's the output of > > customerMISMaster.printSchema(); ? > > Best, Fabian > > > > 2018-07-02 11:33 GMT+02:00 Amol S - iProgrammer <[hidden email]>: > > > Hello Xingcan > > > > DataStream<Oplog> streamSource = env > > .addSource(kafkaConsumer) > > .setParallelism(4); > > > > StreamTableEnvironment tableEnv = TableEnvironment. > > getTableEnvironment(env); > > // Convert the DataStream into a Table with default fields "f0", "f1" > > Table table1 = tableEnv.fromDataStream(streamSource); > > > > > > Table customerMISMaster = table1.filter("ns === > > 'local.customerMISMaster'"). > > select("o as master"); > > Table customerMISChild1 = table1.filter("ns === > > 'local.customerMISChild1'"). > > select("o as child1"); > > Table customerMISChild2 = table1.filter("ns === > > 'local.customerMISChild2'"). > > select("o as child2"); > > Table result = customerMISMaster.join(customerMISChild1).where(" > > master.loanApplicationId=child1.loanApplicationId"); > > > > > > it is throwing error "Method threw > > 'org.apache.flink.table.api.ValidationException' exception. Undefined > > function: LOANAPPLICATIONID" > > > > > > > > ----------------------------------------------- > > *Amol Suryawanshi* > > Java Developer > > [hidden email] > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > 411016, > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > www.iprogrammer.com <[hidden email]> > > ------------------------------------------------ > > > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <[hidden email]> wrote: > > > > > Hi Amol, > > > > > > The “dynamic table” is just a logical concept, following which the > Flink > > > table API is designed. > > > That means you don’t need to implement dynamic tables yourself. > > > > > > Flink table API provides different kinds of stream to stream joins in > > > recent versions (from 1.4). > > > The related docs can be found here https://ci.apache.org/projects > > > /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < > > > https://ci.apache.org/projects/flink/flink-docs-release-1. > > > 5/dev/table/tableApi.html#joins>. > > > > > > Best, > > > Xingcan > > > > > > > > > > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer < > > [hidden email]> > > > wrote: > > > > > > > > Hello, > > > > > > > > I am streaming mongodb oplog using kafka and flink and want to join > > > > multiple tables using flink table api but i have some concerns like > is > > it > > > > possible to join streamed tables in flink and if yes then please > > provide > > > me > > > > some example of stream join using table API. > > > > > > > > I gone through your dynamic table api doc. it is quit interesting but > > > > haven't found any example tutorial how to implement dynamic table. > > > > > > > > I have tried to implement table api join using pojo class but it is > > > > giving org.apache.flink.table.api.TableException: Cannot generate a > > > valid > > > > execution plan for the given query > > > > > > > > ----------------------------------------------- > > > > *Amol Suryawanshi* > > > > Java Developer > > > > [hidden email] > > > > > > > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > > > 411016, > > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > > > www.iprogrammer.com <[hidden email]> > > > > ------------------------------------------------ > > > > > > > > > |
In reply to this post by Amol S - iProgrammer
Hi Amol,
These are the requirements for POJOs [1] that are fully supported by Flink. Best, Fabian [1] https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/api_concepts.html#pojos 2018-07-02 12:19 GMT+02:00 Amol S - iProgrammer <[hidden email]>: > Hello Xingcan > > As mentioned in above mail thread I am streaming mongodb oplog to join > multiple mongo tables based on some unique key (Primary key). To achieve > this I have created one java pojo as below. where o represent generic pojo > type of mongodb which has my table fields i.e. dynamic. now I want to use > table api join over this basic BasicDBObject but it seem flink does not > allow generic pojo's. please suggest on this. > > public class Oplog { > private OplogTimestamp ts; > private BasicDBObject o; > } > > > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > [hidden email] > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <[hidden email]> > ------------------------------------------------ > > On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer < > [hidden email]> > wrote: > > > Hello Xingcan > > > > DataStream<Oplog> streamSource = env > > .addSource(kafkaConsumer) > > .setParallelism(4); > > > > StreamTableEnvironment tableEnv = TableEnvironment. > getTableEnvironment(env); > > // Convert the DataStream into a Table with default fields "f0", "f1" > > Table table1 = tableEnv.fromDataStream(streamSource); > > > > > > Table customerMISMaster = table1.filter("ns === > 'local.customerMISMaster'" > > ).select("o as master"); > > Table customerMISChild1 = table1.filter("ns === > 'local.customerMISChild1'" > > ).select("o as child1"); > > Table customerMISChild2 = table1.filter("ns === > 'local.customerMISChild2'" > > ).select("o as child2"); > > Table result = customerMISMaster.join(customerMISChild1).where("master. > > loanApplicationId=child1.loanApplicationId"); > > > > > > it is throwing error "Method threw 'org.apache.flink.table.api.ValidationException' > exception. Undefined function: LOANAPPLICATIONID" > > > > > > > > ----------------------------------------------- > > *Amol Suryawanshi* > > Java Developer > > [hidden email] > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > 411016, > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > www.iprogrammer.com <[hidden email]> > > ------------------------------------------------ > > > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <[hidden email]> wrote: > > > >> Hi Amol, > >> > >> The “dynamic table” is just a logical concept, following which the Flink > >> table API is designed. > >> That means you don’t need to implement dynamic tables yourself. > >> > >> Flink table API provides different kinds of stream to stream joins in > >> recent versions (from 1.4). > >> The related docs can be found here https://ci.apache.org/projects > >> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < > >> https://ci.apache.org/projects/flink/flink-docs-release-1.5 > >> /dev/table/tableApi.html#joins>. > >> > >> Best, > >> Xingcan > >> > >> > >> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer < > [hidden email]> > >> wrote: > >> > > >> > Hello, > >> > > >> > I am streaming mongodb oplog using kafka and flink and want to join > >> > multiple tables using flink table api but i have some concerns like is > >> it > >> > possible to join streamed tables in flink and if yes then please > >> provide me > >> > some example of stream join using table API. > >> > > >> > I gone through your dynamic table api doc. it is quit interesting but > >> > haven't found any example tutorial how to implement dynamic table. > >> > > >> > I have tried to implement table api join using pojo class but it is > >> > giving org.apache.flink.table.api.TableException: Cannot generate a > >> valid > >> > execution plan for the given query > >> > > >> > ----------------------------------------------- > >> > *Amol Suryawanshi* > >> > Java Developer > >> > [hidden email] > >> > > >> > > >> > *iProgrammer Solutions Pvt. Ltd.* > >> > > >> > > >> > > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > >> 411016, > >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > >> > www.iprogrammer.com <[hidden email]> > >> > ------------------------------------------------ > >> > >> > > > |
Hello Fabian,
According to my requirement I can not create static pojo's for all classes because I want to create dynamic jobs for all tables based on rule engine config. Please suggest me if there any other way to achieve this. ----------------------------------------------- *Amol Suryawanshi* Java Developer [hidden email] *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <[hidden email]> ------------------------------------------------ On Mon, Jul 2, 2018 at 4:02 PM, Fabian Hueske <[hidden email]> wrote: > Hi Amol, > > These are the requirements for POJOs [1] that are fully supported by Flink. > > Best, Fabian > > [1] > https://ci.apache.org/projects/flink/flink-docs- > release-1.5/dev/api_concepts.html#pojos > > 2018-07-02 12:19 GMT+02:00 Amol S - iProgrammer <[hidden email]>: > > > Hello Xingcan > > > > As mentioned in above mail thread I am streaming mongodb oplog to join > > multiple mongo tables based on some unique key (Primary key). To achieve > > this I have created one java pojo as below. where o represent generic > pojo > > type of mongodb which has my table fields i.e. dynamic. now I want to use > > table api join over this basic BasicDBObject but it seem flink does not > > allow generic pojo's. please suggest on this. > > > > public class Oplog { > > private OplogTimestamp ts; > > private BasicDBObject o; > > } > > > > > > > > ----------------------------------------------- > > *Amol Suryawanshi* > > Java Developer > > [hidden email] > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > 411016, > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > www.iprogrammer.com <[hidden email]> > > ------------------------------------------------ > > > > On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer < > > [hidden email]> > > wrote: > > > > > Hello Xingcan > > > > > > DataStream<Oplog> streamSource = env > > > .addSource(kafkaConsumer) > > > .setParallelism(4); > > > > > > StreamTableEnvironment tableEnv = TableEnvironment. > > getTableEnvironment(env); > > > // Convert the DataStream into a Table with default fields "f0", "f1" > > > Table table1 = tableEnv.fromDataStream(streamSource); > > > > > > > > > Table customerMISMaster = table1.filter("ns === > > 'local.customerMISMaster'" > > > ).select("o as master"); > > > Table customerMISChild1 = table1.filter("ns === > > 'local.customerMISChild1'" > > > ).select("o as child1"); > > > Table customerMISChild2 = table1.filter("ns === > > 'local.customerMISChild2'" > > > ).select("o as child2"); > > > Table result = customerMISMaster.join(customerMISChild1).where(" > master. > > > loanApplicationId=child1.loanApplicationId"); > > > > > > > > > it is throwing error "Method threw 'org.apache.flink.table.api. > ValidationException' > > exception. Undefined function: LOANAPPLICATIONID" > > > > > > > > > > > > ----------------------------------------------- > > > *Amol Suryawanshi* > > > Java Developer > > > [hidden email] > > > > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > > 411016, > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > > www.iprogrammer.com <[hidden email]> > > > ------------------------------------------------ > > > > > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <[hidden email]> > wrote: > > > > > >> Hi Amol, > > >> > > >> The “dynamic table” is just a logical concept, following which the > Flink > > >> table API is designed. > > >> That means you don’t need to implement dynamic tables yourself. > > >> > > >> Flink table API provides different kinds of stream to stream joins in > > >> recent versions (from 1.4). > > >> The related docs can be found here https://ci.apache.org/projects > > >> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < > > >> https://ci.apache.org/projects/flink/flink-docs-release-1.5 > > >> /dev/table/tableApi.html#joins>. > > >> > > >> Best, > > >> Xingcan > > >> > > >> > > >> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer < > > [hidden email]> > > >> wrote: > > >> > > > >> > Hello, > > >> > > > >> > I am streaming mongodb oplog using kafka and flink and want to join > > >> > multiple tables using flink table api but i have some concerns like > is > > >> it > > >> > possible to join streamed tables in flink and if yes then please > > >> provide me > > >> > some example of stream join using table API. > > >> > > > >> > I gone through your dynamic table api doc. it is quit interesting > but > > >> > haven't found any example tutorial how to implement dynamic table. > > >> > > > >> > I have tried to implement table api join using pojo class but it is > > >> > giving org.apache.flink.table.api.TableException: Cannot generate a > > >> valid > > >> > execution plan for the given query > > >> > > > >> > ----------------------------------------------- > > >> > *Amol Suryawanshi* > > >> > Java Developer > > >> > [hidden email] > > >> > > > >> > > > >> > *iProgrammer Solutions Pvt. Ltd.* > > >> > > > >> > > > >> > > > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > > >> 411016, > > >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > >> > www.iprogrammer.com <[hidden email]> > > >> > ------------------------------------------------ > > >> > > >> > > > > > > |
You can also use Row, but then you cannot rely on automatic type extraction
and provide TypeInformation. Amol S - iProgrammer <[hidden email]> schrieb am Mo., 2. Juli 2018, 12:37: > Hello Fabian, > > According to my requirement I can not create static pojo's for all classes > because I want to create dynamic jobs for all tables based on rule engine > config. Please suggest me if there any other way to achieve this. > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > [hidden email] > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <[hidden email]> > ------------------------------------------------ > > On Mon, Jul 2, 2018 at 4:02 PM, Fabian Hueske <[hidden email]> wrote: > > > Hi Amol, > > > > These are the requirements for POJOs [1] that are fully supported by > Flink. > > > > Best, Fabian > > > > [1] > > https://ci.apache.org/projects/flink/flink-docs- > > release-1.5/dev/api_concepts.html#pojos > > > > 2018-07-02 12:19 GMT+02:00 Amol S - iProgrammer <[hidden email]>: > > > > > Hello Xingcan > > > > > > As mentioned in above mail thread I am streaming mongodb oplog to join > > > multiple mongo tables based on some unique key (Primary key). To > achieve > > > this I have created one java pojo as below. where o represent generic > > pojo > > > type of mongodb which has my table fields i.e. dynamic. now I want to > use > > > table api join over this basic BasicDBObject but it seem flink does not > > > allow generic pojo's. please suggest on this. > > > > > > public class Oplog { > > > private OplogTimestamp ts; > > > private BasicDBObject o; > > > } > > > > > > > > > > > > ----------------------------------------------- > > > *Amol Suryawanshi* > > > Java Developer > > > [hidden email] > > > > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > > 411016, > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > > www.iprogrammer.com <[hidden email]> > > > ------------------------------------------------ > > > > > > On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer < > > > [hidden email]> > > > wrote: > > > > > > > Hello Xingcan > > > > > > > > DataStream<Oplog> streamSource = env > > > > .addSource(kafkaConsumer) > > > > .setParallelism(4); > > > > > > > > StreamTableEnvironment tableEnv = TableEnvironment. > > > getTableEnvironment(env); > > > > // Convert the DataStream into a Table with default fields "f0", "f1" > > > > Table table1 = tableEnv.fromDataStream(streamSource); > > > > > > > > > > > > Table customerMISMaster = table1.filter("ns === > > > 'local.customerMISMaster'" > > > > ).select("o as master"); > > > > Table customerMISChild1 = table1.filter("ns === > > > 'local.customerMISChild1'" > > > > ).select("o as child1"); > > > > Table customerMISChild2 = table1.filter("ns === > > > 'local.customerMISChild2'" > > > > ).select("o as child2"); > > > > Table result = customerMISMaster.join(customerMISChild1).where(" > > master. > > > > loanApplicationId=child1.loanApplicationId"); > > > > > > > > > > > > it is throwing error "Method threw 'org.apache.flink.table.api. > > ValidationException' > > > exception. Undefined function: LOANAPPLICATIONID" > > > > > > > > > > > > > > > > ----------------------------------------------- > > > > *Amol Suryawanshi* > > > > Java Developer > > > > [hidden email] > > > > > > > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > > > 411016, > > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > > > www.iprogrammer.com <[hidden email]> > > > > ------------------------------------------------ > > > > > > > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <[hidden email]> > > wrote: > > > > > > > >> Hi Amol, > > > >> > > > >> The “dynamic table” is just a logical concept, following which the > > Flink > > > >> table API is designed. > > > >> That means you don’t need to implement dynamic tables yourself. > > > >> > > > >> Flink table API provides different kinds of stream to stream joins > in > > > >> recent versions (from 1.4). > > > >> The related docs can be found here https://ci.apache.org/projects > > > >> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < > > > >> https://ci.apache.org/projects/flink/flink-docs-release-1.5 > > > >> /dev/table/tableApi.html#joins>. > > > >> > > > >> Best, > > > >> Xingcan > > > >> > > > >> > > > >> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer < > > > [hidden email]> > > > >> wrote: > > > >> > > > > >> > Hello, > > > >> > > > > >> > I am streaming mongodb oplog using kafka and flink and want to > join > > > >> > multiple tables using flink table api but i have some concerns > like > > is > > > >> it > > > >> > possible to join streamed tables in flink and if yes then please > > > >> provide me > > > >> > some example of stream join using table API. > > > >> > > > > >> > I gone through your dynamic table api doc. it is quit interesting > > but > > > >> > haven't found any example tutorial how to implement dynamic table. > > > >> > > > > >> > I have tried to implement table api join using pojo class but it > is > > > >> > giving org.apache.flink.table.api.TableException: Cannot generate > a > > > >> valid > > > >> > execution plan for the given query > > > >> > > > > >> > ----------------------------------------------- > > > >> > *Amol Suryawanshi* > > > >> > Java Developer > > > >> > [hidden email] > > > >> > > > > >> > > > > >> > *iProgrammer Solutions Pvt. Ltd.* > > > >> > > > > >> > > > > >> > > > > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > > >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune > - > > > >> 411016, > > > >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > > >> > www.iprogrammer.com <[hidden email]> > > > >> > ------------------------------------------------ > > > >> > > > >> > > > > > > > > > > |
Hello Fabian,
Can you please tell me hot to convert Table back into DataStream? I just want to print the table result. ----------------------------------------------- *Amol Suryawanshi* Java Developer [hidden email] *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <[hidden email]> ------------------------------------------------ On Mon, Jul 2, 2018 at 4:20 PM, Fabian Hueske <[hidden email]> wrote: > You can also use Row, but then you cannot rely on automatic type extraction > and provide TypeInformation. > > Amol S - iProgrammer <[hidden email]> schrieb am Mo., 2. Juli 2018, > 12:37: > > > Hello Fabian, > > > > According to my requirement I can not create static pojo's for all > classes > > because I want to create dynamic jobs for all tables based on rule engine > > config. Please suggest me if there any other way to achieve this. > > > > ----------------------------------------------- > > *Amol Suryawanshi* > > Java Developer > > [hidden email] > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > 411016, > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > www.iprogrammer.com <[hidden email]> > > ------------------------------------------------ > > > > On Mon, Jul 2, 2018 at 4:02 PM, Fabian Hueske <[hidden email]> wrote: > > > > > Hi Amol, > > > > > > These are the requirements for POJOs [1] that are fully supported by > > Flink. > > > > > > Best, Fabian > > > > > > [1] > > > https://ci.apache.org/projects/flink/flink-docs- > > > release-1.5/dev/api_concepts.html#pojos > > > > > > 2018-07-02 12:19 GMT+02:00 Amol S - iProgrammer <[hidden email] > >: > > > > > > > Hello Xingcan > > > > > > > > As mentioned in above mail thread I am streaming mongodb oplog to > join > > > > multiple mongo tables based on some unique key (Primary key). To > > achieve > > > > this I have created one java pojo as below. where o represent generic > > > pojo > > > > type of mongodb which has my table fields i.e. dynamic. now I want to > > use > > > > table api join over this basic BasicDBObject but it seem flink does > not > > > > allow generic pojo's. please suggest on this. > > > > > > > > public class Oplog { > > > > private OplogTimestamp ts; > > > > private BasicDBObject o; > > > > } > > > > > > > > > > > > > > > > ----------------------------------------------- > > > > *Amol Suryawanshi* > > > > Java Developer > > > > [hidden email] > > > > > > > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > > > 411016, > > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > > > www.iprogrammer.com <[hidden email]> > > > > ------------------------------------------------ > > > > > > > > On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer < > > > > [hidden email]> > > > > wrote: > > > > > > > > > Hello Xingcan > > > > > > > > > > DataStream<Oplog> streamSource = env > > > > > .addSource(kafkaConsumer) > > > > > .setParallelism(4); > > > > > > > > > > StreamTableEnvironment tableEnv = TableEnvironment. > > > > getTableEnvironment(env); > > > > > // Convert the DataStream into a Table with default fields "f0", > "f1" > > > > > Table table1 = tableEnv.fromDataStream(streamSource); > > > > > > > > > > > > > > > Table customerMISMaster = table1.filter("ns === > > > > 'local.customerMISMaster'" > > > > > ).select("o as master"); > > > > > Table customerMISChild1 = table1.filter("ns === > > > > 'local.customerMISChild1'" > > > > > ).select("o as child1"); > > > > > Table customerMISChild2 = table1.filter("ns === > > > > 'local.customerMISChild2'" > > > > > ).select("o as child2"); > > > > > Table result = customerMISMaster.join(customerMISChild1).where(" > > > master. > > > > > loanApplicationId=child1.loanApplicationId"); > > > > > > > > > > > > > > > it is throwing error "Method threw 'org.apache.flink.table.api. > > > ValidationException' > > > > exception. Undefined function: LOANAPPLICATIONID" > > > > > > > > > > > > > > > > > > > > ----------------------------------------------- > > > > > *Amol Suryawanshi* > > > > > Java Developer > > > > > [hidden email] > > > > > > > > > > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > > > > > > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > > > > 411016, > > > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > > > > www.iprogrammer.com <[hidden email]> > > > > > ------------------------------------------------ > > > > > > > > > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <[hidden email]> > > > wrote: > > > > > > > > > >> Hi Amol, > > > > >> > > > > >> The “dynamic table” is just a logical concept, following which the > > > Flink > > > > >> table API is designed. > > > > >> That means you don’t need to implement dynamic tables yourself. > > > > >> > > > > >> Flink table API provides different kinds of stream to stream joins > > in > > > > >> recent versions (from 1.4). > > > > >> The related docs can be found here https://ci.apache.org/projects > > > > >> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < > > > > >> https://ci.apache.org/projects/flink/flink-docs-release-1.5 > > > > >> /dev/table/tableApi.html#joins>. > > > > >> > > > > >> Best, > > > > >> Xingcan > > > > >> > > > > >> > > > > >> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer < > > > > [hidden email]> > > > > >> wrote: > > > > >> > > > > > >> > Hello, > > > > >> > > > > > >> > I am streaming mongodb oplog using kafka and flink and want to > > join > > > > >> > multiple tables using flink table api but i have some concerns > > like > > > is > > > > >> it > > > > >> > possible to join streamed tables in flink and if yes then please > > > > >> provide me > > > > >> > some example of stream join using table API. > > > > >> > > > > > >> > I gone through your dynamic table api doc. it is quit > interesting > > > but > > > > >> > haven't found any example tutorial how to implement dynamic > table. > > > > >> > > > > > >> > I have tried to implement table api join using pojo class but it > > is > > > > >> > giving org.apache.flink.table.api.TableException: Cannot > generate > > a > > > > >> valid > > > > >> > execution plan for the given query > > > > >> > > > > > >> > ----------------------------------------------- > > > > >> > *Amol Suryawanshi* > > > > >> > Java Developer > > > > >> > [hidden email] > > > > >> > > > > > >> > > > > > >> > *iProgrammer Solutions Pvt. Ltd.* > > > > >> > > > > > >> > > > > > >> > > > > > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing > Society, > > > > >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, > Pune > > - > > > > >> 411016, > > > > >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > > > >> > www.iprogrammer.com <[hidden email]> > > > > >> > ------------------------------------------------ > > > > >> > > > > >> > > > > > > > > > > > > > > > |
Hello fabian,
I have tried to convert table into stream as below Cannot generate a valid execution plan for the given query: tableEnv.toDataStream(result, Oplog.class); and it is giving me below error. LogicalFilter(condition=[<>($1, $3)]) LogicalJoin(condition=[true], joinType=[inner]) LogicalProject(master=[$1], timeStamp=[$5]) LogicalFilter(condition=[=($0, _UTF-16LE'local.customerMISMaster')]) LogicalTableScan(table=[[_DataStreamTable_0]]) LogicalProject(child1=[$1], timeStamp2=[$5]) LogicalFilter(condition=[=($0, _UTF-16LE'local.customerMISChild1')]) LogicalTableScan(table=[[_DataStreamTable_0]]) This exception indicates that the query uses an unsupported SQL feature. Please check the documentation for the set of currently supported SQL features. ----------------------------------------------- *Amol Suryawanshi* Java Developer [hidden email] *iProgrammer Solutions Pvt. Ltd.* *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* www.iprogrammer.com <[hidden email]> ------------------------------------------------ On Mon, Jul 2, 2018 at 4:43 PM, Amol S - iProgrammer <[hidden email]> wrote: > Hello Fabian, > > Can you please tell me hot to convert Table back into DataStream? I just > want to print the table result. > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > [hidden email] > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <[hidden email]> > ------------------------------------------------ > > On Mon, Jul 2, 2018 at 4:20 PM, Fabian Hueske <[hidden email]> wrote: > >> You can also use Row, but then you cannot rely on automatic type >> extraction >> and provide TypeInformation. >> >> Amol S - iProgrammer <[hidden email]> schrieb am Mo., 2. Juli >> 2018, >> 12:37: >> >> > Hello Fabian, >> > >> > According to my requirement I can not create static pojo's for all >> classes >> > because I want to create dynamic jobs for all tables based on rule >> engine >> > config. Please suggest me if there any other way to achieve this. >> > >> > ----------------------------------------------- >> > *Amol Suryawanshi* >> > Java Developer >> > [hidden email] >> > >> > >> > *iProgrammer Solutions Pvt. Ltd.* >> > >> > >> > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - >> 411016, >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >> > www.iprogrammer.com <[hidden email]> >> > ------------------------------------------------ >> > >> > On Mon, Jul 2, 2018 at 4:02 PM, Fabian Hueske <[hidden email]> >> wrote: >> > >> > > Hi Amol, >> > > >> > > These are the requirements for POJOs [1] that are fully supported by >> > Flink. >> > > >> > > Best, Fabian >> > > >> > > [1] >> > > https://ci.apache.org/projects/flink/flink-docs- >> > > release-1.5/dev/api_concepts.html#pojos >> > > >> > > 2018-07-02 12:19 GMT+02:00 Amol S - iProgrammer < >> [hidden email]>: >> > > >> > > > Hello Xingcan >> > > > >> > > > As mentioned in above mail thread I am streaming mongodb oplog to >> join >> > > > multiple mongo tables based on some unique key (Primary key). To >> > achieve >> > > > this I have created one java pojo as below. where o represent >> generic >> > > pojo >> > > > type of mongodb which has my table fields i.e. dynamic. now I want >> to >> > use >> > > > table api join over this basic BasicDBObject but it seem flink does >> not >> > > > allow generic pojo's. please suggest on this. >> > > > >> > > > public class Oplog { >> > > > private OplogTimestamp ts; >> > > > private BasicDBObject o; >> > > > } >> > > > >> > > > >> > > > >> > > > ----------------------------------------------- >> > > > *Amol Suryawanshi* >> > > > Java Developer >> > > > [hidden email] >> > > > >> > > > >> > > > *iProgrammer Solutions Pvt. Ltd.* >> > > > >> > > > >> > > > >> > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, >> > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - >> > > 411016, >> > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >> > > > www.iprogrammer.com <[hidden email]> >> > > > ------------------------------------------------ >> > > > >> > > > On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer < >> > > > [hidden email]> >> > > > wrote: >> > > > >> > > > > Hello Xingcan >> > > > > >> > > > > DataStream<Oplog> streamSource = env >> > > > > .addSource(kafkaConsumer) >> > > > > .setParallelism(4); >> > > > > >> > > > > StreamTableEnvironment tableEnv = TableEnvironment. >> > > > getTableEnvironment(env); >> > > > > // Convert the DataStream into a Table with default fields "f0", >> "f1" >> > > > > Table table1 = tableEnv.fromDataStream(streamSource); >> > > > > >> > > > > >> > > > > Table customerMISMaster = table1.filter("ns === >> > > > 'local.customerMISMaster'" >> > > > > ).select("o as master"); >> > > > > Table customerMISChild1 = table1.filter("ns === >> > > > 'local.customerMISChild1'" >> > > > > ).select("o as child1"); >> > > > > Table customerMISChild2 = table1.filter("ns === >> > > > 'local.customerMISChild2'" >> > > > > ).select("o as child2"); >> > > > > Table result = customerMISMaster.join(customerMISChild1).where(" >> > > master. >> > > > > loanApplicationId=child1.loanApplicationId"); >> > > > > >> > > > > >> > > > > it is throwing error "Method threw 'org.apache.flink.table.api. >> > > ValidationException' >> > > > exception. Undefined function: LOANAPPLICATIONID" >> > > > > >> > > > > >> > > > > >> > > > > ----------------------------------------------- >> > > > > *Amol Suryawanshi* >> > > > > Java Developer >> > > > > [hidden email] >> > > > > >> > > > > >> > > > > *iProgrammer Solutions Pvt. Ltd.* >> > > > > >> > > > > >> > > > > >> > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, >> > > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune >> - >> > > > 411016, >> > > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >> > > > > www.iprogrammer.com <[hidden email]> >> > > > > ------------------------------------------------ >> > > > > >> > > > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <[hidden email]> >> > > wrote: >> > > > > >> > > > >> Hi Amol, >> > > > >> >> > > > >> The “dynamic table” is just a logical concept, following which >> the >> > > Flink >> > > > >> table API is designed. >> > > > >> That means you don’t need to implement dynamic tables yourself. >> > > > >> >> > > > >> Flink table API provides different kinds of stream to stream >> joins >> > in >> > > > >> recent versions (from 1.4). >> > > > >> The related docs can be found here >> https://ci.apache.org/projects >> > > > >> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < >> > > > >> https://ci.apache.org/projects/flink/flink-docs-release-1.5 >> > > > >> /dev/table/tableApi.html#joins>. >> > > > >> >> > > > >> Best, >> > > > >> Xingcan >> > > > >> >> > > > >> >> > > > >> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer < >> > > > [hidden email]> >> > > > >> wrote: >> > > > >> > >> > > > >> > Hello, >> > > > >> > >> > > > >> > I am streaming mongodb oplog using kafka and flink and want to >> > join >> > > > >> > multiple tables using flink table api but i have some concerns >> > like >> > > is >> > > > >> it >> > > > >> > possible to join streamed tables in flink and if yes then >> please >> > > > >> provide me >> > > > >> > some example of stream join using table API. >> > > > >> > >> > > > >> > I gone through your dynamic table api doc. it is quit >> interesting >> > > but >> > > > >> > haven't found any example tutorial how to implement dynamic >> table. >> > > > >> > >> > > > >> > I have tried to implement table api join using pojo class but >> it >> > is >> > > > >> > giving org.apache.flink.table.api.TableException: Cannot >> generate >> > a >> > > > >> valid >> > > > >> > execution plan for the given query >> > > > >> > >> > > > >> > ----------------------------------------------- >> > > > >> > *Amol Suryawanshi* >> > > > >> > Java Developer >> > > > >> > [hidden email] >> > > > >> > >> > > > >> > >> > > > >> > *iProgrammer Solutions Pvt. Ltd.* >> > > > >> > >> > > > >> > >> > > > >> > >> > > > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing >> Society, >> > > > >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, >> Pune >> > - >> > > > >> 411016, >> > > > >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* >> > > > >> > www.iprogrammer.com <[hidden email]> >> > > > >> > ------------------------------------------------ >> > > > >> >> > > > >> >> > > > > >> > > > >> > > >> > >> > > |
CrossJoins are not supported.
You should add an equality join predicate. 2018-07-02 13:26 GMT+02:00 Amol S - iProgrammer <[hidden email]>: > Hello fabian, > > I have tried to convert table into stream as below > > > Cannot generate a valid execution plan for the given query: > > tableEnv.toDataStream(result, Oplog.class); > > and it is giving me below error. > > > LogicalFilter(condition=[<>($1, $3)]) > LogicalJoin(condition=[true], joinType=[inner]) > LogicalProject(master=[$1], timeStamp=[$5]) > LogicalFilter(condition=[=($0, _UTF-16LE'local.customerMISMaster')]) > LogicalTableScan(table=[[_DataStreamTable_0]]) > LogicalProject(child1=[$1], timeStamp2=[$5]) > LogicalFilter(condition=[=($0, _UTF-16LE'local.customerMISChild1')]) > LogicalTableScan(table=[[_DataStreamTable_0]]) > > This exception indicates that the query uses an unsupported SQL feature. > Please check the documentation for the set of currently supported SQL > features. > > ----------------------------------------------- > *Amol Suryawanshi* > Java Developer > [hidden email] > > > *iProgrammer Solutions Pvt. Ltd.* > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - 411016, > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > www.iprogrammer.com <[hidden email]> > ------------------------------------------------ > > On Mon, Jul 2, 2018 at 4:43 PM, Amol S - iProgrammer < > [hidden email]> > wrote: > > > Hello Fabian, > > > > Can you please tell me hot to convert Table back into DataStream? I just > > want to print the table result. > > > > ----------------------------------------------- > > *Amol Suryawanshi* > > Java Developer > > [hidden email] > > > > > > *iProgrammer Solutions Pvt. Ltd.* > > > > > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > 411016, > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > > www.iprogrammer.com <[hidden email]> > > ------------------------------------------------ > > > > On Mon, Jul 2, 2018 at 4:20 PM, Fabian Hueske <[hidden email]> wrote: > > > >> You can also use Row, but then you cannot rely on automatic type > >> extraction > >> and provide TypeInformation. > >> > >> Amol S - iProgrammer <[hidden email]> schrieb am Mo., 2. Juli > >> 2018, > >> 12:37: > >> > >> > Hello Fabian, > >> > > >> > According to my requirement I can not create static pojo's for all > >> classes > >> > because I want to create dynamic jobs for all tables based on rule > >> engine > >> > config. Please suggest me if there any other way to achieve this. > >> > > >> > ----------------------------------------------- > >> > *Amol Suryawanshi* > >> > Java Developer > >> > [hidden email] > >> > > >> > > >> > *iProgrammer Solutions Pvt. Ltd.* > >> > > >> > > >> > > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune - > >> 411016, > >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > >> > www.iprogrammer.com <[hidden email]> > >> > ------------------------------------------------ > >> > > >> > On Mon, Jul 2, 2018 at 4:02 PM, Fabian Hueske <[hidden email]> > >> wrote: > >> > > >> > > Hi Amol, > >> > > > >> > > These are the requirements for POJOs [1] that are fully supported by > >> > Flink. > >> > > > >> > > Best, Fabian > >> > > > >> > > [1] > >> > > https://ci.apache.org/projects/flink/flink-docs- > >> > > release-1.5/dev/api_concepts.html#pojos > >> > > > >> > > 2018-07-02 12:19 GMT+02:00 Amol S - iProgrammer < > >> [hidden email]>: > >> > > > >> > > > Hello Xingcan > >> > > > > >> > > > As mentioned in above mail thread I am streaming mongodb oplog to > >> join > >> > > > multiple mongo tables based on some unique key (Primary key). To > >> > achieve > >> > > > this I have created one java pojo as below. where o represent > >> generic > >> > > pojo > >> > > > type of mongodb which has my table fields i.e. dynamic. now I want > >> to > >> > use > >> > > > table api join over this basic BasicDBObject but it seem flink > does > >> not > >> > > > allow generic pojo's. please suggest on this. > >> > > > > >> > > > public class Oplog { > >> > > > private OplogTimestamp ts; > >> > > > private BasicDBObject o; > >> > > > } > >> > > > > >> > > > > >> > > > > >> > > > ----------------------------------------------- > >> > > > *Amol Suryawanshi* > >> > > > Java Developer > >> > > > [hidden email] > >> > > > > >> > > > > >> > > > *iProgrammer Solutions Pvt. Ltd.* > >> > > > > >> > > > > >> > > > > >> > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing Society, > >> > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, Pune > - > >> > > 411016, > >> > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > >> > > > www.iprogrammer.com <[hidden email]> > >> > > > ------------------------------------------------ > >> > > > > >> > > > On Mon, Jul 2, 2018 at 3:03 PM, Amol S - iProgrammer < > >> > > > [hidden email]> > >> > > > wrote: > >> > > > > >> > > > > Hello Xingcan > >> > > > > > >> > > > > DataStream<Oplog> streamSource = env > >> > > > > .addSource(kafkaConsumer) > >> > > > > .setParallelism(4); > >> > > > > > >> > > > > StreamTableEnvironment tableEnv = TableEnvironment. > >> > > > getTableEnvironment(env); > >> > > > > // Convert the DataStream into a Table with default fields "f0", > >> "f1" > >> > > > > Table table1 = tableEnv.fromDataStream(streamSource); > >> > > > > > >> > > > > > >> > > > > Table customerMISMaster = table1.filter("ns === > >> > > > 'local.customerMISMaster'" > >> > > > > ).select("o as master"); > >> > > > > Table customerMISChild1 = table1.filter("ns === > >> > > > 'local.customerMISChild1'" > >> > > > > ).select("o as child1"); > >> > > > > Table customerMISChild2 = table1.filter("ns === > >> > > > 'local.customerMISChild2'" > >> > > > > ).select("o as child2"); > >> > > > > Table result = customerMISMaster.join( > customerMISChild1).where(" > >> > > master. > >> > > > > loanApplicationId=child1.loanApplicationId"); > >> > > > > > >> > > > > > >> > > > > it is throwing error "Method threw 'org.apache.flink.table.api. > >> > > ValidationException' > >> > > > exception. Undefined function: LOANAPPLICATIONID" > >> > > > > > >> > > > > > >> > > > > > >> > > > > ----------------------------------------------- > >> > > > > *Amol Suryawanshi* > >> > > > > Java Developer > >> > > > > [hidden email] > >> > > > > > >> > > > > > >> > > > > *iProgrammer Solutions Pvt. Ltd.* > >> > > > > > >> > > > > > >> > > > > > >> > > > > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing > Society, > >> > > > > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, > Pune > >> - > >> > > > 411016, > >> > > > > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > >> > > > > www.iprogrammer.com <[hidden email]> > >> > > > > ------------------------------------------------ > >> > > > > > >> > > > > On Mon, Jul 2, 2018 at 2:56 PM, Xingcan Cui <[hidden email] > > > >> > > wrote: > >> > > > > > >> > > > >> Hi Amol, > >> > > > >> > >> > > > >> The “dynamic table” is just a logical concept, following which > >> the > >> > > Flink > >> > > > >> table API is designed. > >> > > > >> That means you don’t need to implement dynamic tables yourself. > >> > > > >> > >> > > > >> Flink table API provides different kinds of stream to stream > >> joins > >> > in > >> > > > >> recent versions (from 1.4). > >> > > > >> The related docs can be found here > >> https://ci.apache.org/projects > >> > > > >> /flink/flink-docs-release-1.5/dev/table/tableApi.html#joins < > >> > > > >> https://ci.apache.org/projects/flink/flink-docs-release-1.5 > >> > > > >> /dev/table/tableApi.html#joins>. > >> > > > >> > >> > > > >> Best, > >> > > > >> Xingcan > >> > > > >> > >> > > > >> > >> > > > >> > On Jul 2, 2018, at 4:49 PM, Amol S - iProgrammer < > >> > > > [hidden email]> > >> > > > >> wrote: > >> > > > >> > > >> > > > >> > Hello, > >> > > > >> > > >> > > > >> > I am streaming mongodb oplog using kafka and flink and want > to > >> > join > >> > > > >> > multiple tables using flink table api but i have some > concerns > >> > like > >> > > is > >> > > > >> it > >> > > > >> > possible to join streamed tables in flink and if yes then > >> please > >> > > > >> provide me > >> > > > >> > some example of stream join using table API. > >> > > > >> > > >> > > > >> > I gone through your dynamic table api doc. it is quit > >> interesting > >> > > but > >> > > > >> > haven't found any example tutorial how to implement dynamic > >> table. > >> > > > >> > > >> > > > >> > I have tried to implement table api join using pojo class but > >> it > >> > is > >> > > > >> > giving org.apache.flink.table.api.TableException: Cannot > >> generate > >> > a > >> > > > >> valid > >> > > > >> > execution plan for the given query > >> > > > >> > > >> > > > >> > ----------------------------------------------- > >> > > > >> > *Amol Suryawanshi* > >> > > > >> > Java Developer > >> > > > >> > [hidden email] > >> > > > >> > > >> > > > >> > > >> > > > >> > *iProgrammer Solutions Pvt. Ltd.* > >> > > > >> > > >> > > > >> > > >> > > > >> > > >> > > > >> > *Office 103, 104, 1st Floor Pride Portal,Shivaji Housing > >> Society, > >> > > > >> > Bahiratwadi,Near Hotel JW Marriott, Off Senapati Bapat Road, > >> Pune > >> > - > >> > > > >> 411016, > >> > > > >> > MH, INDIA.**Phone: +91 9689077510 | Skype: amols_iprogrammer* > >> > > > >> > www.iprogrammer.com <[hidden email]> > >> > > > >> > ------------------------------------------------ > >> > > > >> > >> > > > >> > >> > > > > > >> > > > > >> > > > >> > > >> > > > > > |
Free forum by Nabble | Edit this page |