Multiple select single result

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Multiple select single result

dhanuka ranasinghe
Hi All,

I am trying to select multiple results from Kafka and send results to Kafka
different topic using Table API. But I am getting below error. Could you
please help me on this.

Query:

SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724
AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
 UNION
SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724
AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
 UNION
SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724
AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'


*Error:*

2019-01-13 21:36:36,228 ERROR
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception
occurred in REST handler.
org.apache.flink.runtime.rest.handler.RestHandlerException:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
at
java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
at
java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
at
java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.util.concurrent.CompletionException:
org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error.
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
at
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
... 3 more
Caused by: org.apache.flink.client.program.ProgramInvocationException: The
main method caused an error.
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
at
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
at
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
at
org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
... 4 more
Caused by: org.apache.flink.table.api.TableException: AppendStreamTableSink
requires that Table has only insert changes.
at
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
at
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
at org.apache.flink.table.api.Table.insertInto(table.scala:877)
at
org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
... 9 more


*Source Code:*








*StreamTableEnvironment tableEnv =
TableEnvironment.getTableEnvironment(env);
tableEnv.registerFunction("mytime", new MyTime(10)); tableEnv.connect(new
Kafka().version("0.10").topic("testin") .properties(kConsumer)
.startFromLatest()) .withFormat(new
Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
.withSchema(new Schema() .field("InterceptID", "DECIMAL")
.field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
.field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
.rowtime(new
Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
.inAppendMode() .registerTableSource(sourceTable); // WindowedTable
windowedTable = //
tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
//tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder();
for(String sql : rules) {     if(multi.length() > 0) { multi.append(" UNION
").append("\n");     }     multi.append( sql);      }
LOGGER.info("********************************* " + multi.toString()); Table
result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the
external system to connect to .connect(new Kafka().version("0.10")
.topic("testout").startFromEarliest() .properties(kProducer) )
.withFormat(new Json().failOnMissingField(false).deriveSchema())
.withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
.field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
.field("ts2", Types.STRING()) ) // specify the update-mode for streaming
tables .inAppendMode() // register as source, sink, or both and under a
name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
"INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
Cheers,Dhanuka*



--
Nothing Impossible,Creativity is more important than knowledge.
Reply | Threaded
Open this post in threaded view
|

Re: Multiple select single result

Fabian Hueske-2
Hi Dhanuka,

The important error message here is "AppendStreamTableSink requires that
Table has only insert changes".
This is because you use UNION instead of UNION ALL, which implies duplicate
elimination.
Unfortunately, UNION is currently internally implemented as a regular
aggregration which produces a retraction stream (although, this would not
be necessary).

If you don't require duplicate elimination, you can replace UNION by UNION
ALL and the query should work.
If you require duplicate elimination, it is currently not possible to use
SQL for your use case.

There is thea Jira issue FLINK-9422 to improve this case [1].

Best, Fabian

[1] https://issues.apache.org/jira/browse/FLINK-9422

Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
[hidden email]>:

> Hi All,
>
> I am trying to select multiple results from Kafka and send results to Kafka
> different topic using Table API. But I am getting below error. Could you
> please help me on this.
>
> Query:
>
> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724
> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>  UNION
> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724
> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>  UNION
> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID = 4508724
> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>
>
> *Error:*
>
> 2019-01-13 21:36:36,228 ERROR
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception
> occurred in REST handler.
> org.apache.flink.runtime.rest.handler.RestHandlerException:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
> at
>
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
> at
>
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> at
>
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> at
>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> at
>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException:
> org.apache.flink.client.program.ProgramInvocationException: The main method
> caused an error.
> at
>
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
> at
>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> ... 3 more
> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
> main method caused an error.
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> at
>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> at
>
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> at
>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
> at
>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
> at
>
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
> ... 4 more
> Caused by: org.apache.flink.table.api.TableException: AppendStreamTableSink
> requires that Table has only insert changes.
> at
>
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
> at
>
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
> at
>
> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> ... 9 more
>
>
> *Source Code:*
>
>
>
>
>
>
>
>
> *StreamTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
> tableEnv.registerFunction("mytime", new MyTime(10)); tableEnv.connect(new
> Kafka().version("0.10").topic("testin") .properties(kConsumer)
> .startFromLatest()) .withFormat(new
> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
> .rowtime(new
> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
> windowedTable = //
> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
> //tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder();
> for(String sql : rules) {     if(multi.length() > 0) { multi.append(" UNION
> ").append("\n");     }     multi.append( sql);      }
> LOGGER.info("********************************* " + multi.toString()); Table
> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the
> external system to connect to .connect(new Kafka().version("0.10")
> .topic("testout").startFromEarliest() .properties(kProducer) )
> .withFormat(new Json().failOnMissingField(false).deriveSchema())
> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
> .field("ts2", Types.STRING()) ) // specify the update-mode for streaming
> tables .inAppendMode() // register as source, sink, or both and under a
> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
> Cheers,Dhanuka*
>
>
>
> --
> Nothing Impossible,Creativity is more important than knowledge.
>
Reply | Threaded
Open this post in threaded view
|

Re: Multiple select single result

dhanuka ranasinghe
Hi Fabian,

Thanks for the prompt reply and its working 🤗.

I am trying to deploy 200 SQL unions and it seems all the tasks getting
failing after some time.

How do i allocate memory for task manager and job manager. What are the
factors need to be considered .

Cheers
Dhanuka

On Sun, 13 Jan 2019, 22:05 Fabian Hueske <[hidden email] wrote:

> Hi Dhanuka,
>
> The important error message here is "AppendStreamTableSink requires that
> Table has only insert changes".
> This is because you use UNION instead of UNION ALL, which implies
> duplicate elimination.
> Unfortunately, UNION is currently internally implemented as a regular
> aggregration which produces a retraction stream (although, this would not
> be necessary).
>
> If you don't require duplicate elimination, you can replace UNION by UNION
> ALL and the query should work.
> If you require duplicate elimination, it is currently not possible to use
> SQL for your use case.
>
> There is thea Jira issue FLINK-9422 to improve this case [1].
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9422
>
> Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
> [hidden email]>:
>
>> Hi All,
>>
>> I am trying to select multiple results from Kafka and send results to
>> Kafka
>> different topic using Table API. But I am getting below error. Could you
>> please help me on this.
>>
>> Query:
>>
>> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>> 4508724
>> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>  UNION
>> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>> 4508724
>> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>  UNION
>> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>> 4508724
>> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>
>>
>> *Error:*
>>
>> 2019-01-13 21:36:36,228 ERROR
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception
>> occurred in REST handler.
>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> caused an error.
>> at
>>
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>> at
>>
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>> at
>>
>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>> at
>>
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>>
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.util.concurrent.CompletionException:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> caused an error.
>> at
>>
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>> at
>>
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> ... 3 more
>> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
>> main method caused an error.
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>> at
>>
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>> at
>>
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>> at
>>
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>> at
>>
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>> ... 4 more
>> Caused by: org.apache.flink.table.api.TableException:
>> AppendStreamTableSink
>> requires that Table has only insert changes.
>> at
>>
>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>> at
>>
>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>> at
>>
>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>> ... 9 more
>>
>>
>> *Source Code:*
>>
>>
>>
>>
>>
>>
>>
>>
>> *StreamTableEnvironment tableEnv =
>> TableEnvironment.getTableEnvironment(env);
>> tableEnv.registerFunction("mytime", new MyTime(10)); tableEnv.connect(new
>> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>> .startFromLatest()) .withFormat(new
>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>> .rowtime(new
>> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
>> windowedTable = //
>> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>> //tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder();
>> for(String sql : rules) {     if(multi.length() > 0) { multi.append("
>> UNION
>> ").append("\n");     }     multi.append( sql);      }
>> LOGGER.info("********************************* " + multi.toString());
>> Table
>> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the
>> external system to connect to .connect(new Kafka().version("0.10")
>> .topic("testout").startFromEarliest() .properties(kProducer) )
>> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>> .field("ts2", Types.STRING()) ) // specify the update-mode for streaming
>> tables .inAppendMode() // register as source, sink, or both and under a
>> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
>> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
>> Cheers,Dhanuka*
>>
>>
>>
>> --
>> Nothing Impossible,Creativity is more important than knowledge.
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Multiple select single result

Hequn Cheng
Hi dhanuka,

> I am trying to deploy 200 SQL unions and it seems all the tasks getting
failing after some time.
Would be great if you can show us some information(say exception stack)
about the failure. Is it caused by OOM of job manager?

> How do i allocate memory for task manager and job manager. What are the
factors need to be considered .
According to your SQL, I guess you need more memory for the job manager[1]
since you unionAll 200 tables, the job graph should be a bit big. As for
the taskmanger, I think it may be ok to use the default memory setting
unless you allocate a lot of memory in your UDFs or you just want to make
better use of the memory(we can discuss more if you like).

Best, Hequn

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager

On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
[hidden email]> wrote:

> Hi Fabian,
>
> Thanks for the prompt reply and its working 🤗.
>
> I am trying to deploy 200 SQL unions and it seems all the tasks getting
> failing after some time.
>
> How do i allocate memory for task manager and job manager. What are the
> factors need to be considered .
>
> Cheers
> Dhanuka
>
> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <[hidden email] wrote:
>
> > Hi Dhanuka,
> >
> > The important error message here is "AppendStreamTableSink requires that
> > Table has only insert changes".
> > This is because you use UNION instead of UNION ALL, which implies
> > duplicate elimination.
> > Unfortunately, UNION is currently internally implemented as a regular
> > aggregration which produces a retraction stream (although, this would not
> > be necessary).
> >
> > If you don't require duplicate elimination, you can replace UNION by
> UNION
> > ALL and the query should work.
> > If you require duplicate elimination, it is currently not possible to use
> > SQL for your use case.
> >
> > There is thea Jira issue FLINK-9422 to improve this case [1].
> >
> > Best, Fabian
> >
> > [1] https://issues.apache.org/jira/browse/FLINK-9422
> >
> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
> > [hidden email]>:
> >
> >> Hi All,
> >>
> >> I am trying to select multiple results from Kafka and send results to
> >> Kafka
> >> different topic using Table API. But I am getting below error. Could you
> >> please help me on this.
> >>
> >> Query:
> >>
> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
> >> 4508724
> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> >>  UNION
> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
> >> 4508724
> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> >>  UNION
> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
> >> 4508724
> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> >>
> >>
> >> *Error:*
> >>
> >> 2019-01-13 21:36:36,228 ERROR
> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
> Exception
> >> occurred in REST handler.
> >> org.apache.flink.runtime.rest.handler.RestHandlerException:
> >> org.apache.flink.client.program.ProgramInvocationException: The main
> >> method
> >> caused an error.
> >> at
> >>
> >>
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
> >> at
> >>
> >>
> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
> >> at
> >>
> >>
> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
> >> at
> >>
> >>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >> at
> >>
> >>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
> >> at
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> >> at
> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> >> at java.lang.Thread.run(Thread.java:748)
> >> Caused by: java.util.concurrent.CompletionException:
> >> org.apache.flink.client.program.ProgramInvocationException: The main
> >> method
> >> caused an error.
> >> at
> >>
> >>
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
> >> at
> >>
> >>
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> >> ... 3 more
> >> Caused by: org.apache.flink.client.program.ProgramInvocationException:
> The
> >> main method caused an error.
> >> at
> >>
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
> >> at
> >>
> >>
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
> >> at
> >>
> >>
> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
> >> at
> >>
> >>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
> >> at
> >>
> >>
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
> >> at
> >>
> >>
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
> >> ... 4 more
> >> Caused by: org.apache.flink.table.api.TableException:
> >> AppendStreamTableSink
> >> requires that Table has only insert changes.
> >> at
> >>
> >>
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
> >> at
> >>
> >>
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
> >> at
> >>
> >>
> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >> at
> >>
> >>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >> at
> >>
> >>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >> at java.lang.reflect.Method.invoke(Method.java:498)
> >> at
> >>
> >>
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
> >> ... 9 more
> >>
> >>
> >> *Source Code:*
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> *StreamTableEnvironment tableEnv =
> >> TableEnvironment.getTableEnvironment(env);
> >> tableEnv.registerFunction("mytime", new MyTime(10));
> tableEnv.connect(new
> >> Kafka().version("0.10").topic("testin") .properties(kConsumer)
> >> .startFromLatest()) .withFormat(new
> >>
> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
> >> .rowtime(new
> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
> >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
> >> windowedTable = //
> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
> >> //tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder();
> >> for(String sql : rules) {     if(multi.length() > 0) { multi.append("
> >> UNION
> >> ").append("\n");     }     multi.append( sql);      }
> >> LOGGER.info("********************************* " + multi.toString());
> >> Table
> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the
> >> external system to connect to .connect(new Kafka().version("0.10")
> >> .topic("testout").startFromEarliest() .properties(kProducer) )
> >> .withFormat(new Json().failOnMissingField(false).deriveSchema())
> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
> >> .field("ts2", Types.STRING()) ) // specify the update-mode for streaming
> >> tables .inAppendMode() // register as source, sink, or both and under a
> >> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
> >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
> >> Cheers,Dhanuka*
> >>
> >>
> >>
> >> --
> >> Nothing Impossible,Creativity is more important than knowledge.
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Multiple select single result

dhanuka ranasinghe
Hi Hequn,

I think it's obvious when we see the job graph for 200 unions. I have attached the screenshot here with.

I also tried out different approach , which is instead of UNION ALL


On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <[hidden email]> wrote:
Hi dhanuka,

> I am trying to deploy 200 SQL unions and it seems all the tasks getting failing after some time.
Would be great if you can show us some information(say exception stack) about the failure. Is it caused by OOM of job manager? 

> How do i allocate memory for task manager and job manager. What are the factors need to be considered .
According to your SQL, I guess you need more memory for the job manager[1] since you unionAll 200 tables, the job graph should be a bit big. As for the taskmanger, I think it may be ok to use the default memory setting unless you allocate a lot of memory in your UDFs or you just want to make better use of the memory(we can discuss more if you like).  

Best, Hequn


On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <[hidden email]> wrote:
Hi Fabian,

Thanks for the prompt reply and its working 🤗.

I am trying to deploy 200 SQL unions and it seems all the tasks getting
failing after some time.

How do i allocate memory for task manager and job manager. What are the
factors need to be considered .

Cheers
Dhanuka

On Sun, 13 Jan 2019, 22:05 Fabian Hueske <[hidden email] wrote:

> Hi Dhanuka,
>
> The important error message here is "AppendStreamTableSink requires that
> Table has only insert changes".
> This is because you use UNION instead of UNION ALL, which implies
> duplicate elimination.
> Unfortunately, UNION is currently internally implemented as a regular
> aggregration which produces a retraction stream (although, this would not
> be necessary).
>
> If you don't require duplicate elimination, you can replace UNION by UNION
> ALL and the query should work.
> If you require duplicate elimination, it is currently not possible to use
> SQL for your use case.
>
> There is thea Jira issue FLINK-9422 to improve this case [1].
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9422
>
> Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
> [hidden email]>:
>
>> Hi All,
>>
>> I am trying to select multiple results from Kafka and send results to
>> Kafka
>> different topic using Table API. But I am getting below error. Could you
>> please help me on this.
>>
>> Query:
>>
>> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>> 4508724
>> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>  UNION
>> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>> 4508724
>> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>  UNION
>> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>> 4508724
>> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>
>>
>> *Error:*
>>
>> 2019-01-13 21:36:36,228 ERROR
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception
>> occurred in REST handler.
>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> caused an error.
>> at
>>
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>> at
>>
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>> at
>>
>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>> at
>>
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>>
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.util.concurrent.CompletionException:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> caused an error.
>> at
>>
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>> at
>>
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> ... 3 more
>> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
>> main method caused an error.
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>> at
>>
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>> at
>>
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>> at
>>
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>> at
>>
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>> ... 4 more
>> Caused by: org.apache.flink.table.api.TableException:
>> AppendStreamTableSink
>> requires that Table has only insert changes.
>> at
>>
>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>> at
>>
>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>> at
>>
>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>> ... 9 more
>>
>>
>> *Source Code:*
>>
>>
>>
>>
>>
>>
>>
>>
>> *StreamTableEnvironment tableEnv =
>> TableEnvironment.getTableEnvironment(env);
>> tableEnv.registerFunction("mytime", new MyTime(10)); tableEnv.connect(new
>> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>> .startFromLatest()) .withFormat(new
>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>> .rowtime(new
>> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
>> windowedTable = //
>> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>> //tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder();
>> for(String sql : rules) {     if(multi.length() > 0) { multi.append("
>> UNION
>> ").append("\n");     }     multi.append( sql);      }
>> LOGGER.info("********************************* " + multi.toString());
>> Table
>> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the
>> external system to connect to .connect(new Kafka().version("0.10")
>> .topic("testout").startFromEarliest() .properties(kProducer) )
>> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>> .field("ts2", Types.STRING()) ) // specify the update-mode for streaming
>> tables .inAppendMode() // register as source, sink, or both and under a
>> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
>> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
>> Cheers,Dhanuka*
>>
>>
>>
>> --
>> Nothing Impossible,Creativity is more important than knowledge.
>>
>


--
Nothing Impossible,Creativity is more important than knowledge.
Reply | Threaded
Open this post in threaded view
|

Re: Multiple select single result

dhanuka ranasinghe
SORRY about sending mail without completing :) ,


I also tried out different approach , which is instead of UNION ALL, use OR  as below.

( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' 
) OR 
( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' 
) OR 
( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%' 
)

But only downside is , with this approach if all the where clause conditions sets equal it seems Flink behave like use only one condition set.

I have attached screenshot here with.

Could you please explain me about this? Thanks in advance.

Cheers,
Dhanuka

On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <[hidden email]> wrote:
Hi Hequn,

I think it's obvious when we see the job graph for 200 unions. I have attached the screenshot here with.

I also tried out different approach , which is instead of UNION ALL


On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <[hidden email]> wrote:
Hi dhanuka,

> I am trying to deploy 200 SQL unions and it seems all the tasks getting failing after some time.
Would be great if you can show us some information(say exception stack) about the failure. Is it caused by OOM of job manager? 

> How do i allocate memory for task manager and job manager. What are the factors need to be considered .
According to your SQL, I guess you need more memory for the job manager[1] since you unionAll 200 tables, the job graph should be a bit big. As for the taskmanger, I think it may be ok to use the default memory setting unless you allocate a lot of memory in your UDFs or you just want to make better use of the memory(we can discuss more if you like).  

Best, Hequn


On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <[hidden email]> wrote:
Hi Fabian,

Thanks for the prompt reply and its working 🤗.

I am trying to deploy 200 SQL unions and it seems all the tasks getting
failing after some time.

How do i allocate memory for task manager and job manager. What are the
factors need to be considered .

Cheers
Dhanuka

On Sun, 13 Jan 2019, 22:05 Fabian Hueske <[hidden email] wrote:

> Hi Dhanuka,
>
> The important error message here is "AppendStreamTableSink requires that
> Table has only insert changes".
> This is because you use UNION instead of UNION ALL, which implies
> duplicate elimination.
> Unfortunately, UNION is currently internally implemented as a regular
> aggregration which produces a retraction stream (although, this would not
> be necessary).
>
> If you don't require duplicate elimination, you can replace UNION by UNION
> ALL and the query should work.
> If you require duplicate elimination, it is currently not possible to use
> SQL for your use case.
>
> There is thea Jira issue FLINK-9422 to improve this case [1].
>
> Best, Fabian
>
> [1] https://issues.apache.org/jira/browse/FLINK-9422
>
> Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
> [hidden email]>:
>
>> Hi All,
>>
>> I am trying to select multiple results from Kafka and send results to
>> Kafka
>> different topic using Table API. But I am getting below error. Could you
>> please help me on this.
>>
>> Query:
>>
>> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>> 4508724
>> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>  UNION
>> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>> 4508724
>> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>  UNION
>> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>> 4508724
>> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>
>>
>> *Error:*
>>
>> 2019-01-13 21:36:36,228 ERROR
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    - Exception
>> occurred in REST handler.
>> org.apache.flink.runtime.rest.handler.RestHandlerException:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> caused an error.
>> at
>>
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>> at
>>
>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>> at
>>
>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>> at
>>
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> at
>>
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>> at
>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>> at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.util.concurrent.CompletionException:
>> org.apache.flink.client.program.ProgramInvocationException: The main
>> method
>> caused an error.
>> at
>>
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>> at
>>
>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>> ... 3 more
>> Caused by: org.apache.flink.client.program.ProgramInvocationException: The
>> main method caused an error.
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>> at
>>
>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>> at
>>
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>> at
>>
>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>> at
>>
>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>> ... 4 more
>> Caused by: org.apache.flink.table.api.TableException:
>> AppendStreamTableSink
>> requires that Table has only insert changes.
>> at
>>
>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>> at
>>
>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>> at
>>
>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> at
>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> at
>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> at java.lang.reflect.Method.invoke(Method.java:498)
>> at
>>
>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>> ... 9 more
>>
>>
>> *Source Code:*
>>
>>
>>
>>
>>
>>
>>
>>
>> *StreamTableEnvironment tableEnv =
>> TableEnvironment.getTableEnvironment(env);
>> tableEnv.registerFunction("mytime", new MyTime(10)); tableEnv.connect(new
>> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>> .startFromLatest()) .withFormat(new
>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>> .rowtime(new
>> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
>> windowedTable = //
>> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>> //tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder();
>> for(String sql : rules) {     if(multi.length() > 0) { multi.append("
>> UNION
>> ").append("\n");     }     multi.append( sql);      }
>> LOGGER.info("********************************* " + multi.toString());
>> Table
>> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the
>> external system to connect to .connect(new Kafka().version("0.10")
>> .topic("testout").startFromEarliest() .properties(kProducer) )
>> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>> .field("ts2", Types.STRING()) ) // specify the update-mode for streaming
>> tables .inAppendMode() // register as source, sink, or both and under a
>> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
>> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
>> Cheers,Dhanuka*
>>
>>
>>
>> --
>> Nothing Impossible,Creativity is more important than knowledge.
>>
>


--
Nothing Impossible,Creativity is more important than knowledge.


--
Nothing Impossible,Creativity is more important than knowledge.
Reply | Threaded
Open this post in threaded view
|

Re: Multiple select single result

Fabian Hueske-2
Hi,

you should avoid the UNION ALL approach because the query will scan the
(identical?) Kafka topic 200 times which is highly inefficient.
You should rather use your second approach and scale the query
appropriately.

Best, Fabian

Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe <
[hidden email]>:

> SORRY about sending mail without completing :) ,
>
>
> I also tried out different approach , which is instead of UNION ALL, use
> OR  as below.
>
> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> ) OR
> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> ) OR
> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> )
>
> But only downside is , with this approach if all the where clause conditions sets equal it seems Flink behave like use only one condition set.
>
> I have attached screenshot here with.
>
> Could you please explain me about this? Thanks in advance.
>
> Cheers,
>
> Dhanuka
>
>
> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
> [hidden email]> wrote:
>
>> Hi Hequn,
>>
>> I think it's obvious when we see the job graph for 200 unions. I have
>> attached the screenshot here with.
>>
>> I also tried out different approach , which is instead of UNION ALL
>>
>>
>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <[hidden email]> wrote:
>>
>>> Hi dhanuka,
>>>
>>> > I am trying to deploy 200 SQL unions and it seems all the tasks
>>> getting failing after some time.
>>> Would be great if you can show us some information(say exception stack)
>>> about the failure. Is it caused by OOM of job manager?
>>>
>>> > How do i allocate memory for task manager and job manager. What are
>>> the factors need to be considered .
>>> According to your SQL, I guess you need more memory for the job
>>> manager[1] since you unionAll 200 tables, the job graph should be a bit
>>> big. As for the taskmanger, I think it may be ok to use the default memory
>>> setting unless you allocate a lot of memory in your UDFs or you just want
>>> to make better use of the memory(we can discuss more if you like).
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager
>>>
>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
>>> [hidden email]> wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>> Thanks for the prompt reply and its working 🤗.
>>>>
>>>> I am trying to deploy 200 SQL unions and it seems all the tasks getting
>>>> failing after some time.
>>>>
>>>> How do i allocate memory for task manager and job manager. What are the
>>>> factors need to be considered .
>>>>
>>>> Cheers
>>>> Dhanuka
>>>>
>>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <[hidden email] wrote:
>>>>
>>>> > Hi Dhanuka,
>>>> >
>>>> > The important error message here is "AppendStreamTableSink requires
>>>> that
>>>> > Table has only insert changes".
>>>> > This is because you use UNION instead of UNION ALL, which implies
>>>> > duplicate elimination.
>>>> > Unfortunately, UNION is currently internally implemented as a regular
>>>> > aggregration which produces a retraction stream (although, this would
>>>> not
>>>> > be necessary).
>>>> >
>>>> > If you don't require duplicate elimination, you can replace UNION by
>>>> UNION
>>>> > ALL and the query should work.
>>>> > If you require duplicate elimination, it is currently not possible to
>>>> use
>>>> > SQL for your use case.
>>>> >
>>>> > There is thea Jira issue FLINK-9422 to improve this case [1].
>>>> >
>>>> > Best, Fabian
>>>> >
>>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422
>>>> >
>>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
>>>> > [hidden email]>:
>>>> >
>>>> >> Hi All,
>>>> >>
>>>> >> I am trying to select multiple results from Kafka and send results to
>>>> >> Kafka
>>>> >> different topic using Table API. But I am getting below error. Could
>>>> you
>>>> >> please help me on this.
>>>> >>
>>>> >> Query:
>>>> >>
>>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>> >> 4508724
>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>> >>  UNION
>>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>> >> 4508724
>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>> >>  UNION
>>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>> >> 4508724
>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>> >>
>>>> >>
>>>> >> *Error:*
>>>> >>
>>>> >> 2019-01-13 21:36:36,228 ERROR
>>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>>> Exception
>>>> >> occurred in REST handler.
>>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>> >> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> >> method
>>>> >> caused an error.
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> >> at java.lang.Thread.run(Thread.java:748)
>>>> >> Caused by: java.util.concurrent.CompletionException:
>>>> >> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> >> method
>>>> >> caused an error.
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>> >> ... 3 more
>>>> >> Caused by:
>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>> >> main method caused an error.
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>>>> >> ... 4 more
>>>> >> Caused by: org.apache.flink.table.api.TableException:
>>>> >> AppendStreamTableSink
>>>> >> requires that Table has only insert changes.
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> >> at
>>>> >>
>>>> >>
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> >> at
>>>> >>
>>>> >>
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> >> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>>> >> ... 9 more
>>>> >>
>>>> >>
>>>> >> *Source Code:*
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >> *StreamTableEnvironment tableEnv =
>>>> >> TableEnvironment.getTableEnvironment(env);
>>>> >> tableEnv.registerFunction("mytime", new MyTime(10));
>>>> tableEnv.connect(new
>>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>>>> >> .startFromLatest()) .withFormat(new
>>>> >>
>>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>>>> >> .rowtime(new
>>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>>>> >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
>>>> >> windowedTable = //
>>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder();
>>>> >> for(String sql : rules) {     if(multi.length() > 0) { multi.append("
>>>> >> UNION
>>>> >> ").append("\n");     }     multi.append( sql);      }
>>>> >> LOGGER.info("********************************* " + multi.toString());
>>>> >> Table
>>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the
>>>> >> external system to connect to .connect(new Kafka().version("0.10")
>>>> >> .topic("testout").startFromEarliest() .properties(kProducer) )
>>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for
>>>> streaming
>>>> >> tables .inAppendMode() // register as source, sink, or both and
>>>> under a
>>>> >> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
>>>> >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
>>>> >> Cheers,Dhanuka*
>>>> >>
>>>> >>
>>>> >>
>>>> >> --
>>>> >> Nothing Impossible,Creativity is more important than knowledge.
>>>> >>
>>>> >
>>>>
>>>
>>
>> --
>> Nothing Impossible,Creativity is more important than knowledge.
>>
>
>
> --
> Nothing Impossible,Creativity is more important than knowledge.
>
Reply | Threaded
Open this post in threaded view
|

Re: Multiple select single result

dhanuka ranasinghe
Hi Fabian ,

I was encounter below error with 200 OR operators so I guess this is JVM
level limitation.

Error :

of class "datastreamcalcrule" grows beyond 64 kb

Cheers
Dhanuka


On Mon, 14 Jan 2019, 20:30 Fabian Hueske <[hidden email] wrote:

> Hi,
>
> you should avoid the UNION ALL approach because the query will scan the
> (identical?) Kafka topic 200 times which is highly inefficient.
> You should rather use your second approach and scale the query
> appropriately.
>
> Best, Fabian
>
> Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe <
> [hidden email]>:
>
>> SORRY about sending mail without completing :) ,
>>
>>
>> I also tried out different approach , which is instead of UNION ALL, use
>> OR  as below.
>>
>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>> ) OR
>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>> ) OR
>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>> )
>>
>> But only downside is , with this approach if all the where clause conditions sets equal it seems Flink behave like use only one condition set.
>>
>> I have attached screenshot here with.
>>
>> Could you please explain me about this? Thanks in advance.
>>
>> Cheers,
>>
>> Dhanuka
>>
>>
>> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
>> [hidden email]> wrote:
>>
>>> Hi Hequn,
>>>
>>> I think it's obvious when we see the job graph for 200 unions. I have
>>> attached the screenshot here with.
>>>
>>> I also tried out different approach , which is instead of UNION ALL
>>>
>>>
>>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <[hidden email]>
>>> wrote:
>>>
>>>> Hi dhanuka,
>>>>
>>>> > I am trying to deploy 200 SQL unions and it seems all the tasks
>>>> getting failing after some time.
>>>> Would be great if you can show us some information(say exception stack)
>>>> about the failure. Is it caused by OOM of job manager?
>>>>
>>>> > How do i allocate memory for task manager and job manager. What are
>>>> the factors need to be considered .
>>>> According to your SQL, I guess you need more memory for the job
>>>> manager[1] since you unionAll 200 tables, the job graph should be a bit
>>>> big. As for the taskmanger, I think it may be ok to use the default memory
>>>> setting unless you allocate a lot of memory in your UDFs or you just want
>>>> to make better use of the memory(we can discuss more if you like).
>>>>
>>>> Best, Hequn
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager
>>>>
>>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
>>>> [hidden email]> wrote:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for the prompt reply and its working 🤗.
>>>>>
>>>>> I am trying to deploy 200 SQL unions and it seems all the tasks getting
>>>>> failing after some time.
>>>>>
>>>>> How do i allocate memory for task manager and job manager. What are the
>>>>> factors need to be considered .
>>>>>
>>>>> Cheers
>>>>> Dhanuka
>>>>>
>>>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <[hidden email] wrote:
>>>>>
>>>>> > Hi Dhanuka,
>>>>> >
>>>>> > The important error message here is "AppendStreamTableSink requires
>>>>> that
>>>>> > Table has only insert changes".
>>>>> > This is because you use UNION instead of UNION ALL, which implies
>>>>> > duplicate elimination.
>>>>> > Unfortunately, UNION is currently internally implemented as a regular
>>>>> > aggregration which produces a retraction stream (although, this
>>>>> would not
>>>>> > be necessary).
>>>>> >
>>>>> > If you don't require duplicate elimination, you can replace UNION by
>>>>> UNION
>>>>> > ALL and the query should work.
>>>>> > If you require duplicate elimination, it is currently not possible
>>>>> to use
>>>>> > SQL for your use case.
>>>>> >
>>>>> > There is thea Jira issue FLINK-9422 to improve this case [1].
>>>>> >
>>>>> > Best, Fabian
>>>>> >
>>>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422
>>>>> >
>>>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
>>>>> > [hidden email]>:
>>>>> >
>>>>> >> Hi All,
>>>>> >>
>>>>> >> I am trying to select multiple results from Kafka and send results
>>>>> to
>>>>> >> Kafka
>>>>> >> different topic using Table API. But I am getting below error.
>>>>> Could you
>>>>> >> please help me on this.
>>>>> >>
>>>>> >> Query:
>>>>> >>
>>>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>> >> 4508724
>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>> >>  UNION
>>>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>> >> 4508724
>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>> >>  UNION
>>>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>> >> 4508724
>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>> >>
>>>>> >>
>>>>> >> *Error:*
>>>>> >>
>>>>> >> 2019-01-13 21:36:36,228 ERROR
>>>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>>>> Exception
>>>>> >> occurred in REST handler.
>>>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> >> method
>>>>> >> caused an error.
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>> >> at java.lang.Thread.run(Thread.java:748)
>>>>> >> Caused by: java.util.concurrent.CompletionException:
>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> >> method
>>>>> >> caused an error.
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>> >> ... 3 more
>>>>> >> Caused by:
>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>> >> main method caused an error.
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>>>>> >> ... 4 more
>>>>> >> Caused by: org.apache.flink.table.api.TableException:
>>>>> >> AppendStreamTableSink
>>>>> >> requires that Table has only insert changes.
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>>>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>>>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> >> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>>>> >> ... 9 more
>>>>> >>
>>>>> >>
>>>>> >> *Source Code:*
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> *StreamTableEnvironment tableEnv =
>>>>> >> TableEnvironment.getTableEnvironment(env);
>>>>> >> tableEnv.registerFunction("mytime", new MyTime(10));
>>>>> tableEnv.connect(new
>>>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>>>>> >> .startFromLatest()) .withFormat(new
>>>>> >>
>>>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>>>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>>>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>>>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>>>>> >> .rowtime(new
>>>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>>>>> >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
>>>>> >> windowedTable = //
>>>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>>>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new
>>>>> StringBuilder();
>>>>> >> for(String sql : rules) {     if(multi.length() > 0) {
>>>>> multi.append("
>>>>> >> UNION
>>>>> >> ").append("\n");     }     multi.append( sql);      }
>>>>> >> LOGGER.info("********************************* " +
>>>>> multi.toString());
>>>>> >> Table
>>>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare
>>>>> the
>>>>> >> external system to connect to .connect(new Kafka().version("0.10")
>>>>> >> .topic("testout").startFromEarliest() .properties(kProducer) )
>>>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>>>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>>>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>>>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for
>>>>> streaming
>>>>> >> tables .inAppendMode() // register as source, sink, or both and
>>>>> under a
>>>>> >> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
>>>>> >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
>>>>> >> Cheers,Dhanuka*
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> --
>>>>> >> Nothing Impossible,Creativity is more important than knowledge.
>>>>> >>
>>>>> >
>>>>>
>>>>
>>>
>>> --
>>> Nothing Impossible,Creativity is more important than knowledge.
>>>
>>
>>
>> --
>> Nothing Impossible,Creativity is more important than knowledge.
>>
>
On 14 Jan 2019 20:30, "Fabian Hueske" <[hidden email]> wrote:

Hi,

you should avoid the UNION ALL approach because the query will scan the
(identical?) Kafka topic 200 times which is highly inefficient.
You should rather use your second approach and scale the query
appropriately.

Best, Fabian

Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe <
[hidden email]>:

> SORRY about sending mail without completing :) ,
>
>
> I also tried out different approach , which is instead of UNION ALL, use
> OR  as below.
>
> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> ) OR
> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> ) OR
> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
> )
>
> But only downside is , with this approach if all the where clause conditions sets equal it seems Flink behave like use only one condition set.
>
> I have attached screenshot here with.
>
> Could you please explain me about this? Thanks in advance.
>
> Cheers,
>
> Dhanuka
>
>
> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
> [hidden email]> wrote:
>
>> Hi Hequn,
>>
>> I think it's obvious when we see the job graph for 200 unions. I have
>> attached the screenshot here with.
>>
>> I also tried out different approach , which is instead of UNION ALL
>>
>>
>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <[hidden email]> wrote:
>>
>>> Hi dhanuka,
>>>
>>> > I am trying to deploy 200 SQL unions and it seems all the tasks
>>> getting failing after some time.
>>> Would be great if you can show us some information(say exception stack)
>>> about the failure. Is it caused by OOM of job manager?
>>>
>>> > How do i allocate memory for task manager and job manager. What are
>>> the factors need to be considered .
>>> According to your SQL, I guess you need more memory for the job
>>> manager[1] since you unionAll 200 tables, the job graph should be a bit
>>> big. As for the taskmanger, I think it may be ok to use the default memory
>>> setting unless you allocate a lot of memory in your UDFs or you just want
>>> to make better use of the memory(we can discuss more if you like).
>>>
>>> Best, Hequn
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager
>>>
>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
>>> [hidden email]> wrote:
>>>
>>>> Hi Fabian,
>>>>
>>>> Thanks for the prompt reply and its working 🤗.
>>>>
>>>> I am trying to deploy 200 SQL unions and it seems all the tasks getting
>>>> failing after some time.
>>>>
>>>> How do i allocate memory for task manager and job manager. What are the
>>>> factors need to be considered .
>>>>
>>>> Cheers
>>>> Dhanuka
>>>>
>>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <[hidden email] wrote:
>>>>
>>>> > Hi Dhanuka,
>>>> >
>>>> > The important error message here is "AppendStreamTableSink requires
>>>> that
>>>> > Table has only insert changes".
>>>> > This is because you use UNION instead of UNION ALL, which implies
>>>> > duplicate elimination.
>>>> > Unfortunately, UNION is currently internally implemented as a regular
>>>> > aggregration which produces a retraction stream (although, this would
>>>> not
>>>> > be necessary).
>>>> >
>>>> > If you don't require duplicate elimination, you can replace UNION by
>>>> UNION
>>>> > ALL and the query should work.
>>>> > If you require duplicate elimination, it is currently not possible to
>>>> use
>>>> > SQL for your use case.
>>>> >
>>>> > There is thea Jira issue FLINK-9422 to improve this case [1].
>>>> >
>>>> > Best, Fabian
>>>> >
>>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422
>>>> >
>>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
>>>> > [hidden email]>:
>>>> >
>>>> >> Hi All,
>>>> >>
>>>> >> I am trying to select multiple results from Kafka and send results to
>>>> >> Kafka
>>>> >> different topic using Table API. But I am getting below error. Could
>>>> you
>>>> >> please help me on this.
>>>> >>
>>>> >> Query:
>>>> >>
>>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>> >> 4508724
>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>> >>  UNION
>>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>> >> 4508724
>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>> >>  UNION
>>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>> >> 4508724
>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>> >>
>>>> >>
>>>> >> *Error:*
>>>> >>
>>>> >> 2019-01-13 21:36:36,228 ERROR
>>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>>> Exception
>>>> >> occurred in REST handler.
>>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>> >> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> >> method
>>>> >> caused an error.
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>> >> at java.lang.Thread.run(Thread.java:748)
>>>> >> Caused by: java.util.concurrent.CompletionException:
>>>> >> org.apache.flink.client.program.ProgramInvocationException: The main
>>>> >> method
>>>> >> caused an error.
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>>>> >> at
>>>> >>
>>>> >>
>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>> >> ... 3 more
>>>> >> Caused by:
>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>> >> main method caused an error.
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>>>> >> ... 4 more
>>>> >> Caused by: org.apache.flink.table.api.TableException:
>>>> >> AppendStreamTableSink
>>>> >> requires that Table has only insert changes.
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> >> at
>>>> >>
>>>> >>
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> >> at
>>>> >>
>>>> >>
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> >> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> >> at
>>>> >>
>>>> >>
>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>>> >> ... 9 more
>>>> >>
>>>> >>
>>>> >> *Source Code:*
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >> *StreamTableEnvironment tableEnv =
>>>> >> TableEnvironment.getTableEnvironment(env);
>>>> >> tableEnv.registerFunction("mytime", new MyTime(10));
>>>> tableEnv.connect(new
>>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>>>> >> .startFromLatest()) .withFormat(new
>>>> >>
>>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>>>> >> .rowtime(new
>>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>>>> >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
>>>> >> windowedTable = //
>>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new StringBuilder();
>>>> >> for(String sql : rules) {     if(multi.length() > 0) { multi.append("
>>>> >> UNION
>>>> >> ").append("\n");     }     multi.append( sql);      }
>>>> >> LOGGER.info("********************************* " + multi.toString());
>>>> >> Table
>>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare the
>>>> >> external system to connect to .connect(new Kafka().version("0.10")
>>>> >> .topic("testout").startFromEarliest() .properties(kProducer) )
>>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for
>>>> streaming
>>>> >> tables .inAppendMode() // register as source, sink, or both and
>>>> under a
>>>> >> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
>>>> >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
>>>> >> Cheers,Dhanuka*
>>>> >>
>>>> >>
>>>> >>
>>>> >> --
>>>> >> Nothing Impossible,Creativity is more important than knowledge.
>>>> >>
>>>> >
>>>>
>>>
>>
>> --
>> Nothing Impossible,Creativity is more important than knowledge.
>>
>
>
> --
> Nothing Impossible,Creativity is more important than knowledge.
>
Reply | Threaded
Open this post in threaded view
|

Re: Multiple select single result

Fabian Hueske-2
Hi,

That's a Java limitation. Methods cannot be larger than 64kb and code that
is generated for this predicate exceeds the limit.
There is a Jira issue to fix the problem.

In the meantime, I'd follow a hybrid approach and UNION ALL only as many
tables as you need to avoid the code compilation exception.

Best, Fabian

Am Mo., 14. Jan. 2019 um 14:15 Uhr schrieb dhanuka ranasinghe <
[hidden email]>:

> Hi Fabian ,
>
> I was encounter below error with 200 OR operators so I guess this is JVM
> level limitation.
>
> Error :
>
> of class "datastreamcalcrule" grows beyond 64 kb
>
> Cheers
> Dhanuka
>
>
> On Mon, 14 Jan 2019, 20:30 Fabian Hueske <[hidden email] wrote:
>
>> Hi,
>>
>> you should avoid the UNION ALL approach because the query will scan the
>> (identical?) Kafka topic 200 times which is highly inefficient.
>> You should rather use your second approach and scale the query
>> appropriately.
>>
>> Best, Fabian
>>
>> Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe <
>> [hidden email]>:
>>
>>> SORRY about sending mail without completing :) ,
>>>
>>>
>>> I also tried out different approach , which is instead of UNION ALL, use
>>> OR  as below.
>>>
>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>> ) OR
>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>> ) OR
>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>> )
>>>
>>> But only downside is , with this approach if all the where clause conditions sets equal it seems Flink behave like use only one condition set.
>>>
>>> I have attached screenshot here with.
>>>
>>> Could you please explain me about this? Thanks in advance.
>>>
>>> Cheers,
>>>
>>> Dhanuka
>>>
>>>
>>> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
>>> [hidden email]> wrote:
>>>
>>>> Hi Hequn,
>>>>
>>>> I think it's obvious when we see the job graph for 200 unions. I have
>>>> attached the screenshot here with.
>>>>
>>>> I also tried out different approach , which is instead of UNION ALL
>>>>
>>>>
>>>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi dhanuka,
>>>>>
>>>>> > I am trying to deploy 200 SQL unions and it seems all the tasks
>>>>> getting failing after some time.
>>>>> Would be great if you can show us some information(say exception
>>>>> stack) about the failure. Is it caused by OOM of job manager?
>>>>>
>>>>> > How do i allocate memory for task manager and job manager. What are
>>>>> the factors need to be considered .
>>>>> According to your SQL, I guess you need more memory for the job
>>>>> manager[1] since you unionAll 200 tables, the job graph should be a bit
>>>>> big. As for the taskmanger, I think it may be ok to use the default memory
>>>>> setting unless you allocate a lot of memory in your UDFs or you just want
>>>>> to make better use of the memory(we can discuss more if you like).
>>>>>
>>>>> Best, Hequn
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager
>>>>>
>>>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
>>>>> [hidden email]> wrote:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> Thanks for the prompt reply and its working 🤗.
>>>>>>
>>>>>> I am trying to deploy 200 SQL unions and it seems all the tasks
>>>>>> getting
>>>>>> failing after some time.
>>>>>>
>>>>>> How do i allocate memory for task manager and job manager. What are
>>>>>> the
>>>>>> factors need to be considered .
>>>>>>
>>>>>> Cheers
>>>>>> Dhanuka
>>>>>>
>>>>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <[hidden email] wrote:
>>>>>>
>>>>>> > Hi Dhanuka,
>>>>>> >
>>>>>> > The important error message here is "AppendStreamTableSink requires
>>>>>> that
>>>>>> > Table has only insert changes".
>>>>>> > This is because you use UNION instead of UNION ALL, which implies
>>>>>> > duplicate elimination.
>>>>>> > Unfortunately, UNION is currently internally implemented as a
>>>>>> regular
>>>>>> > aggregration which produces a retraction stream (although, this
>>>>>> would not
>>>>>> > be necessary).
>>>>>> >
>>>>>> > If you don't require duplicate elimination, you can replace UNION
>>>>>> by UNION
>>>>>> > ALL and the query should work.
>>>>>> > If you require duplicate elimination, it is currently not possible
>>>>>> to use
>>>>>> > SQL for your use case.
>>>>>> >
>>>>>> > There is thea Jira issue FLINK-9422 to improve this case [1].
>>>>>> >
>>>>>> > Best, Fabian
>>>>>> >
>>>>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422
>>>>>> >
>>>>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
>>>>>> > [hidden email]>:
>>>>>> >
>>>>>> >> Hi All,
>>>>>> >>
>>>>>> >> I am trying to select multiple results from Kafka and send results
>>>>>> to
>>>>>> >> Kafka
>>>>>> >> different topic using Table API. But I am getting below error.
>>>>>> Could you
>>>>>> >> please help me on this.
>>>>>> >>
>>>>>> >> Query:
>>>>>> >>
>>>>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>>> >> 4508724
>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>> >>  UNION
>>>>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>>> >> 4508724
>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>> >>  UNION
>>>>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>>> >> 4508724
>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>> >>
>>>>>> >>
>>>>>> >> *Error:*
>>>>>> >>
>>>>>> >> 2019-01-13 21:36:36,228 ERROR
>>>>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>>>>> Exception
>>>>>> >> occurred in REST handler.
>>>>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>> main
>>>>>> >> method
>>>>>> >> caused an error.
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> >> at java.lang.Thread.run(Thread.java:748)
>>>>>> >> Caused by: java.util.concurrent.CompletionException:
>>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>> main
>>>>>> >> method
>>>>>> >> caused an error.
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>>> >> ... 3 more
>>>>>> >> Caused by:
>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>> >> main method caused an error.
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>>>>>> >> ... 4 more
>>>>>> >> Caused by: org.apache.flink.table.api.TableException:
>>>>>> >> AppendStreamTableSink
>>>>>> >> requires that Table has only insert changes.
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>>>>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>>>>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> >> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>>>>> >> ... 9 more
>>>>>> >>
>>>>>> >>
>>>>>> >> *Source Code:*
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >> *StreamTableEnvironment tableEnv =
>>>>>> >> TableEnvironment.getTableEnvironment(env);
>>>>>> >> tableEnv.registerFunction("mytime", new MyTime(10));
>>>>>> tableEnv.connect(new
>>>>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>>>>>> >> .startFromLatest()) .withFormat(new
>>>>>> >>
>>>>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>>>>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>>>>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>>>>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>>>>>> >> .rowtime(new
>>>>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>>>>>> >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
>>>>>> >> windowedTable = //
>>>>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>>>>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new
>>>>>> StringBuilder();
>>>>>> >> for(String sql : rules) {     if(multi.length() > 0) {
>>>>>> multi.append("
>>>>>> >> UNION
>>>>>> >> ").append("\n");     }     multi.append( sql);      }
>>>>>> >> LOGGER.info("********************************* " +
>>>>>> multi.toString());
>>>>>> >> Table
>>>>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare
>>>>>> the
>>>>>> >> external system to connect to .connect(new Kafka().version("0.10")
>>>>>> >> .topic("testout").startFromEarliest() .properties(kProducer) )
>>>>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>>>>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>>>>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>>>>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for
>>>>>> streaming
>>>>>> >> tables .inAppendMode() // register as source, sink, or both and
>>>>>> under a
>>>>>> >> name .registerTableSourceAndSink("ruleTable");
>>>>>> //tableEnv.sqlUpdate(
>>>>>> >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
>>>>>> >> Cheers,Dhanuka*
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >> --
>>>>>> >> Nothing Impossible,Creativity is more important than knowledge.
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Nothing Impossible,Creativity is more important than knowledge.
>>>>
>>>
>>>
>>> --
>>> Nothing Impossible,Creativity is more important than knowledge.
>>>
>>
> On 14 Jan 2019 20:30, "Fabian Hueske" <[hidden email]> wrote:
>
> Hi,
>
> you should avoid the UNION ALL approach because the query will scan the
> (identical?) Kafka topic 200 times which is highly inefficient.
> You should rather use your second approach and scale the query
> appropriately.
>
> Best, Fabian
>
> Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe <
> [hidden email]>:
>
>> SORRY about sending mail without completing :) ,
>>
>>
>> I also tried out different approach , which is instead of UNION ALL, use
>> OR  as below.
>>
>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>> ) OR
>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>> ) OR
>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>> )
>>
>> But only downside is , with this approach if all the where clause conditions sets equal it seems Flink behave like use only one condition set.
>>
>> I have attached screenshot here with.
>>
>> Could you please explain me about this? Thanks in advance.
>>
>> Cheers,
>>
>> Dhanuka
>>
>>
>> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
>> [hidden email]> wrote:
>>
>>> Hi Hequn,
>>>
>>> I think it's obvious when we see the job graph for 200 unions. I have
>>> attached the screenshot here with.
>>>
>>> I also tried out different approach , which is instead of UNION ALL
>>>
>>>
>>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <[hidden email]>
>>> wrote:
>>>
>>>> Hi dhanuka,
>>>>
>>>> > I am trying to deploy 200 SQL unions and it seems all the tasks
>>>> getting failing after some time.
>>>> Would be great if you can show us some information(say exception stack)
>>>> about the failure. Is it caused by OOM of job manager?
>>>>
>>>> > How do i allocate memory for task manager and job manager. What are
>>>> the factors need to be considered .
>>>> According to your SQL, I guess you need more memory for the job
>>>> manager[1] since you unionAll 200 tables, the job graph should be a bit
>>>> big. As for the taskmanger, I think it may be ok to use the default memory
>>>> setting unless you allocate a lot of memory in your UDFs or you just want
>>>> to make better use of the memory(we can discuss more if you like).
>>>>
>>>> Best, Hequn
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager
>>>>
>>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
>>>> [hidden email]> wrote:
>>>>
>>>>> Hi Fabian,
>>>>>
>>>>> Thanks for the prompt reply and its working 🤗.
>>>>>
>>>>> I am trying to deploy 200 SQL unions and it seems all the tasks getting
>>>>> failing after some time.
>>>>>
>>>>> How do i allocate memory for task manager and job manager. What are the
>>>>> factors need to be considered .
>>>>>
>>>>> Cheers
>>>>> Dhanuka
>>>>>
>>>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <[hidden email] wrote:
>>>>>
>>>>> > Hi Dhanuka,
>>>>> >
>>>>> > The important error message here is "AppendStreamTableSink requires
>>>>> that
>>>>> > Table has only insert changes".
>>>>> > This is because you use UNION instead of UNION ALL, which implies
>>>>> > duplicate elimination.
>>>>> > Unfortunately, UNION is currently internally implemented as a regular
>>>>> > aggregration which produces a retraction stream (although, this
>>>>> would not
>>>>> > be necessary).
>>>>> >
>>>>> > If you don't require duplicate elimination, you can replace UNION by
>>>>> UNION
>>>>> > ALL and the query should work.
>>>>> > If you require duplicate elimination, it is currently not possible
>>>>> to use
>>>>> > SQL for your use case.
>>>>> >
>>>>> > There is thea Jira issue FLINK-9422 to improve this case [1].
>>>>> >
>>>>> > Best, Fabian
>>>>> >
>>>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422
>>>>> >
>>>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
>>>>> > [hidden email]>:
>>>>> >
>>>>> >> Hi All,
>>>>> >>
>>>>> >> I am trying to select multiple results from Kafka and send results
>>>>> to
>>>>> >> Kafka
>>>>> >> different topic using Table API. But I am getting below error.
>>>>> Could you
>>>>> >> please help me on this.
>>>>> >>
>>>>> >> Query:
>>>>> >>
>>>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>> >> 4508724
>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>> >>  UNION
>>>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>> >> 4508724
>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>> >>  UNION
>>>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>> >> 4508724
>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>> >>
>>>>> >>
>>>>> >> *Error:*
>>>>> >>
>>>>> >> 2019-01-13 21:36:36,228 ERROR
>>>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>>>> Exception
>>>>> >> occurred in REST handler.
>>>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> >> method
>>>>> >> caused an error.
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>> >> at java.lang.Thread.run(Thread.java:748)
>>>>> >> Caused by: java.util.concurrent.CompletionException:
>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The main
>>>>> >> method
>>>>> >> caused an error.
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>> >> ... 3 more
>>>>> >> Caused by:
>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>> >> main method caused an error.
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>>>>> >> ... 4 more
>>>>> >> Caused by: org.apache.flink.table.api.TableException:
>>>>> >> AppendStreamTableSink
>>>>> >> requires that Table has only insert changes.
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>>>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>>>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>> >> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>> >> at
>>>>> >>
>>>>> >>
>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>>>> >> ... 9 more
>>>>> >>
>>>>> >>
>>>>> >> *Source Code:*
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> *StreamTableEnvironment tableEnv =
>>>>> >> TableEnvironment.getTableEnvironment(env);
>>>>> >> tableEnv.registerFunction("mytime", new MyTime(10));
>>>>> tableEnv.connect(new
>>>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>>>>> >> .startFromLatest()) .withFormat(new
>>>>> >>
>>>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>>>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>>>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>>>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>>>>> >> .rowtime(new
>>>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>>>>> >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
>>>>> >> windowedTable = //
>>>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>>>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new
>>>>> StringBuilder();
>>>>> >> for(String sql : rules) {     if(multi.length() > 0) {
>>>>> multi.append("
>>>>> >> UNION
>>>>> >> ").append("\n");     }     multi.append( sql);      }
>>>>> >> LOGGER.info("********************************* " +
>>>>> multi.toString());
>>>>> >> Table
>>>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare
>>>>> the
>>>>> >> external system to connect to .connect(new Kafka().version("0.10")
>>>>> >> .topic("testout").startFromEarliest() .properties(kProducer) )
>>>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>>>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>>>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>>>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for
>>>>> streaming
>>>>> >> tables .inAppendMode() // register as source, sink, or both and
>>>>> under a
>>>>> >> name .registerTableSourceAndSink("ruleTable"); //tableEnv.sqlUpdate(
>>>>> >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
>>>>> >> Cheers,Dhanuka*
>>>>> >>
>>>>> >>
>>>>> >>
>>>>> >> --
>>>>> >> Nothing Impossible,Creativity is more important than knowledge.
>>>>> >>
>>>>> >
>>>>>
>>>>
>>>
>>> --
>>> Nothing Impossible,Creativity is more important than knowledge.
>>>
>>
>>
>> --
>> Nothing Impossible,Creativity is more important than knowledge.
>>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Multiple select single result

dhanuka ranasinghe
Hi Fabian,

+1 👍

Cheers
Dhanuka

On Mon, 14 Jan 2019, 21:29 Fabian Hueske <[hidden email] wrote:

> Hi,
>
> That's a Java limitation. Methods cannot be larger than 64kb and code that
> is generated for this predicate exceeds the limit.
> There is a Jira issue to fix the problem.
>
> In the meantime, I'd follow a hybrid approach and UNION ALL only as many
> tables as you need to avoid the code compilation exception.
>
> Best, Fabian
>
> Am Mo., 14. Jan. 2019 um 14:15 Uhr schrieb dhanuka ranasinghe <
> [hidden email]>:
>
>> Hi Fabian ,
>>
>> I was encounter below error with 200 OR operators so I guess this is JVM
>> level limitation.
>>
>> Error :
>>
>> of class "datastreamcalcrule" grows beyond 64 kb
>>
>> Cheers
>> Dhanuka
>>
>>
>> On Mon, 14 Jan 2019, 20:30 Fabian Hueske <[hidden email] wrote:
>>
>>> Hi,
>>>
>>> you should avoid the UNION ALL approach because the query will scan the
>>> (identical?) Kafka topic 200 times which is highly inefficient.
>>> You should rather use your second approach and scale the query
>>> appropriately.
>>>
>>> Best, Fabian
>>>
>>> Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe <
>>> [hidden email]>:
>>>
>>>> SORRY about sending mail without completing :) ,
>>>>
>>>>
>>>> I also tried out different approach , which is instead of UNION ALL,
>>>> use OR  as below.
>>>>
>>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>> ) OR
>>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>> ) OR
>>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>> )
>>>>
>>>> But only downside is , with this approach if all the where clause conditions sets equal it seems Flink behave like use only one condition set.
>>>>
>>>> I have attached screenshot here with.
>>>>
>>>> Could you please explain me about this? Thanks in advance.
>>>>
>>>> Cheers,
>>>>
>>>> Dhanuka
>>>>
>>>>
>>>> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
>>>> [hidden email]> wrote:
>>>>
>>>>> Hi Hequn,
>>>>>
>>>>> I think it's obvious when we see the job graph for 200 unions. I have
>>>>> attached the screenshot here with.
>>>>>
>>>>> I also tried out different approach , which is instead of UNION ALL
>>>>>
>>>>>
>>>>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <[hidden email]>
>>>>> wrote:
>>>>>
>>>>>> Hi dhanuka,
>>>>>>
>>>>>> > I am trying to deploy 200 SQL unions and it seems all the tasks
>>>>>> getting failing after some time.
>>>>>> Would be great if you can show us some information(say exception
>>>>>> stack) about the failure. Is it caused by OOM of job manager?
>>>>>>
>>>>>> > How do i allocate memory for task manager and job manager. What are
>>>>>> the factors need to be considered .
>>>>>> According to your SQL, I guess you need more memory for the job
>>>>>> manager[1] since you unionAll 200 tables, the job graph should be a bit
>>>>>> big. As for the taskmanger, I think it may be ok to use the default memory
>>>>>> setting unless you allocate a lot of memory in your UDFs or you just want
>>>>>> to make better use of the memory(we can discuss more if you like).
>>>>>>
>>>>>> Best, Hequn
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager
>>>>>>
>>>>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
>>>>>> [hidden email]> wrote:
>>>>>>
>>>>>>> Hi Fabian,
>>>>>>>
>>>>>>> Thanks for the prompt reply and its working 🤗.
>>>>>>>
>>>>>>> I am trying to deploy 200 SQL unions and it seems all the tasks
>>>>>>> getting
>>>>>>> failing after some time.
>>>>>>>
>>>>>>> How do i allocate memory for task manager and job manager. What are
>>>>>>> the
>>>>>>> factors need to be considered .
>>>>>>>
>>>>>>> Cheers
>>>>>>> Dhanuka
>>>>>>>
>>>>>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <[hidden email] wrote:
>>>>>>>
>>>>>>> > Hi Dhanuka,
>>>>>>> >
>>>>>>> > The important error message here is "AppendStreamTableSink
>>>>>>> requires that
>>>>>>> > Table has only insert changes".
>>>>>>> > This is because you use UNION instead of UNION ALL, which implies
>>>>>>> > duplicate elimination.
>>>>>>> > Unfortunately, UNION is currently internally implemented as a
>>>>>>> regular
>>>>>>> > aggregration which produces a retraction stream (although, this
>>>>>>> would not
>>>>>>> > be necessary).
>>>>>>> >
>>>>>>> > If you don't require duplicate elimination, you can replace UNION
>>>>>>> by UNION
>>>>>>> > ALL and the query should work.
>>>>>>> > If you require duplicate elimination, it is currently not possible
>>>>>>> to use
>>>>>>> > SQL for your use case.
>>>>>>> >
>>>>>>> > There is thea Jira issue FLINK-9422 to improve this case [1].
>>>>>>> >
>>>>>>> > Best, Fabian
>>>>>>> >
>>>>>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422
>>>>>>> >
>>>>>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
>>>>>>> > [hidden email]>:
>>>>>>> >
>>>>>>> >> Hi All,
>>>>>>> >>
>>>>>>> >> I am trying to select multiple results from Kafka and send
>>>>>>> results to
>>>>>>> >> Kafka
>>>>>>> >> different topic using Table API. But I am getting below error.
>>>>>>> Could you
>>>>>>> >> please help me on this.
>>>>>>> >>
>>>>>>> >> Query:
>>>>>>> >>
>>>>>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID
>>>>>>> =
>>>>>>> >> 4508724
>>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>>> >>  UNION
>>>>>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID
>>>>>>> =
>>>>>>> >> 4508724
>>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>>> >>  UNION
>>>>>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID
>>>>>>> =
>>>>>>> >> 4508724
>>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> *Error:*
>>>>>>> >>
>>>>>>> >> 2019-01-13 21:36:36,228 ERROR
>>>>>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>>>>>> Exception
>>>>>>> >> occurred in REST handler.
>>>>>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>>> main
>>>>>>> >> method
>>>>>>> >> caused an error.
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>>> >> at java.lang.Thread.run(Thread.java:748)
>>>>>>> >> Caused by: java.util.concurrent.CompletionException:
>>>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>>> main
>>>>>>> >> method
>>>>>>> >> caused an error.
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>>>> >> ... 3 more
>>>>>>> >> Caused by:
>>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>>> >> main method caused an error.
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>>>>>>> >> ... 4 more
>>>>>>> >> Caused by: org.apache.flink.table.api.TableException:
>>>>>>> >> AppendStreamTableSink
>>>>>>> >> requires that Table has only insert changes.
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>>>>>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>>>>>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>>> >> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>>> >> at
>>>>>>> >>
>>>>>>> >>
>>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>>>>>> >> ... 9 more
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> *Source Code:*
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> *StreamTableEnvironment tableEnv =
>>>>>>> >> TableEnvironment.getTableEnvironment(env);
>>>>>>> >> tableEnv.registerFunction("mytime", new MyTime(10));
>>>>>>> tableEnv.connect(new
>>>>>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>>>>>>> >> .startFromLatest()) .withFormat(new
>>>>>>> >>
>>>>>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>>>>>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>>>>>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>>>>>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>>>>>>> >> .rowtime(new
>>>>>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>>>>>>> >> .inAppendMode() .registerTableSource(sourceTable); //
>>>>>>> WindowedTable
>>>>>>> >> windowedTable = //
>>>>>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>>>>>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new
>>>>>>> StringBuilder();
>>>>>>> >> for(String sql : rules) {     if(multi.length() > 0) {
>>>>>>> multi.append("
>>>>>>> >> UNION
>>>>>>> >> ").append("\n");     }     multi.append( sql);      }
>>>>>>> >> LOGGER.info("********************************* " +
>>>>>>> multi.toString());
>>>>>>> >> Table
>>>>>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare
>>>>>>> the
>>>>>>> >> external system to connect to .connect(new Kafka().version("0.10")
>>>>>>> >> .topic("testout").startFromEarliest() .properties(kProducer) )
>>>>>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>>>>>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>>>>>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>>>>>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for
>>>>>>> streaming
>>>>>>> >> tables .inAppendMode() // register as source, sink, or both and
>>>>>>> under a
>>>>>>> >> name .registerTableSourceAndSink("ruleTable");
>>>>>>> //tableEnv.sqlUpdate(
>>>>>>> >> "INSERT INTO ruleTable " + result);
>>>>>>> result.insertInto("ruleTable");
>>>>>>> >> Cheers,Dhanuka*
>>>>>>> >>
>>>>>>> >>
>>>>>>> >>
>>>>>>> >> --
>>>>>>> >> Nothing Impossible,Creativity is more important than knowledge.
>>>>>>> >>
>>>>>>> >
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Nothing Impossible,Creativity is more important than knowledge.
>>>>>
>>>>
>>>>
>>>> --
>>>> Nothing Impossible,Creativity is more important than knowledge.
>>>>
>>>
>> On 14 Jan 2019 20:30, "Fabian Hueske" <[hidden email]> wrote:
>>
>> Hi,
>>
>> you should avoid the UNION ALL approach because the query will scan the
>> (identical?) Kafka topic 200 times which is highly inefficient.
>> You should rather use your second approach and scale the query
>> appropriately.
>>
>> Best, Fabian
>>
>> Am Mo., 14. Jan. 2019 um 08:39 Uhr schrieb dhanuka ranasinghe <
>> [hidden email]>:
>>
>>> SORRY about sending mail without completing :) ,
>>>
>>>
>>> I also tried out different approach , which is instead of UNION ALL, use
>>> OR  as below.
>>>
>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>> ) OR
>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>> ) OR
>>> ( InterceptID = 4508724 AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>> )
>>>
>>> But only downside is , with this approach if all the where clause conditions sets equal it seems Flink behave like use only one condition set.
>>>
>>> I have attached screenshot here with.
>>>
>>> Could you please explain me about this? Thanks in advance.
>>>
>>> Cheers,
>>>
>>> Dhanuka
>>>
>>>
>>> On Mon, Jan 14, 2019 at 3:35 PM dhanuka ranasinghe <
>>> [hidden email]> wrote:
>>>
>>>> Hi Hequn,
>>>>
>>>> I think it's obvious when we see the job graph for 200 unions. I have
>>>> attached the screenshot here with.
>>>>
>>>> I also tried out different approach , which is instead of UNION ALL
>>>>
>>>>
>>>> On Mon, Jan 14, 2019 at 2:57 PM Hequn Cheng <[hidden email]>
>>>> wrote:
>>>>
>>>>> Hi dhanuka,
>>>>>
>>>>> > I am trying to deploy 200 SQL unions and it seems all the tasks
>>>>> getting failing after some time.
>>>>> Would be great if you can show us some information(say exception
>>>>> stack) about the failure. Is it caused by OOM of job manager?
>>>>>
>>>>> > How do i allocate memory for task manager and job manager. What are
>>>>> the factors need to be considered .
>>>>> According to your SQL, I guess you need more memory for the job
>>>>> manager[1] since you unionAll 200 tables, the job graph should be a bit
>>>>> big. As for the taskmanger, I think it may be ok to use the default memory
>>>>> setting unless you allocate a lot of memory in your UDFs or you just want
>>>>> to make better use of the memory(we can discuss more if you like).
>>>>>
>>>>> Best, Hequn
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-stable/ops/config.html#jobmanager
>>>>>
>>>>> On Mon, Jan 14, 2019 at 9:54 AM dhanuka ranasinghe <
>>>>> [hidden email]> wrote:
>>>>>
>>>>>> Hi Fabian,
>>>>>>
>>>>>> Thanks for the prompt reply and its working 🤗.
>>>>>>
>>>>>> I am trying to deploy 200 SQL unions and it seems all the tasks
>>>>>> getting
>>>>>> failing after some time.
>>>>>>
>>>>>> How do i allocate memory for task manager and job manager. What are
>>>>>> the
>>>>>> factors need to be considered .
>>>>>>
>>>>>> Cheers
>>>>>> Dhanuka
>>>>>>
>>>>>> On Sun, 13 Jan 2019, 22:05 Fabian Hueske <[hidden email] wrote:
>>>>>>
>>>>>> > Hi Dhanuka,
>>>>>> >
>>>>>> > The important error message here is "AppendStreamTableSink requires
>>>>>> that
>>>>>> > Table has only insert changes".
>>>>>> > This is because you use UNION instead of UNION ALL, which implies
>>>>>> > duplicate elimination.
>>>>>> > Unfortunately, UNION is currently internally implemented as a
>>>>>> regular
>>>>>> > aggregration which produces a retraction stream (although, this
>>>>>> would not
>>>>>> > be necessary).
>>>>>> >
>>>>>> > If you don't require duplicate elimination, you can replace UNION
>>>>>> by UNION
>>>>>> > ALL and the query should work.
>>>>>> > If you require duplicate elimination, it is currently not possible
>>>>>> to use
>>>>>> > SQL for your use case.
>>>>>> >
>>>>>> > There is thea Jira issue FLINK-9422 to improve this case [1].
>>>>>> >
>>>>>> > Best, Fabian
>>>>>> >
>>>>>> > [1] https://issues.apache.org/jira/browse/FLINK-9422
>>>>>> >
>>>>>> > Am So., 13. Jan. 2019 um 14:43 Uhr schrieb dhanuka ranasinghe <
>>>>>> > [hidden email]>:
>>>>>> >
>>>>>> >> Hi All,
>>>>>> >>
>>>>>> >> I am trying to select multiple results from Kafka and send results
>>>>>> to
>>>>>> >> Kafka
>>>>>> >> different topic using Table API. But I am getting below error.
>>>>>> Could you
>>>>>> >> please help me on this.
>>>>>> >>
>>>>>> >> Query:
>>>>>> >>
>>>>>> >> SELECT TiggerID,'Rule3' as RuleName,mytime(ts) as ts1 ,
>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>>> >> 4508724
>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>> >>  UNION
>>>>>> >> SELECT TiggerID,'Rule2' as RuleName,mytime(ts) as ts1 ,
>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>>> >> 4508724
>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>> >>  UNION
>>>>>> >> SELECT TiggerID,'Rule1' as RuleName,mytime(ts) as ts1 ,
>>>>>> >> mytime(CURRENT_TIMESTAMP) as ts2 FROM dataTable WHERE InterceptID =
>>>>>> >> 4508724
>>>>>> >> AND Provider_Info = 'Dhanuka' AND LIID LIKE '%193400835%'
>>>>>> >>
>>>>>> >>
>>>>>> >> *Error:*
>>>>>> >>
>>>>>> >> 2019-01-13 21:36:36,228 ERROR
>>>>>> >> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler    -
>>>>>> Exception
>>>>>> >> occurred in REST handler.
>>>>>> >> org.apache.flink.runtime.rest.handler.RestHandlerException:
>>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>> main
>>>>>> >> method
>>>>>> >> caused an error.
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$7(JarRunHandler.java:151)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:870)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:852)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1595)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>>>>> >> at java.lang.Thread.run(Thread.java:748)
>>>>>> >> Caused by: java.util.concurrent.CompletionException:
>>>>>> >> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>> main
>>>>>> >> method
>>>>>> >> caused an error.
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:228)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>>>>>> >> ... 3 more
>>>>>> >> Caused by:
>>>>>> org.apache.flink.client.program.ProgramInvocationException: The
>>>>>> >> main method caused an error.
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:546)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:78)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:120)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$getJobGraphAsync$10(JarRunHandler.java:226)
>>>>>> >> ... 4 more
>>>>>> >> Caused by: org.apache.flink.table.api.TableException:
>>>>>> >> AppendStreamTableSink
>>>>>> >> requires that Table has only insert changes.
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:382)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:784)
>>>>>> >> at org.apache.flink.table.api.Table.insertInto(table.scala:877)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.monitoring.stream.analytics.FlinkDynamicLocalSQL.main(FlinkDynamicLocalSQL.java:153)
>>>>>> >> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>>>> >> at java.lang.reflect.Method.invoke(Method.java:498)
>>>>>> >> at
>>>>>> >>
>>>>>> >>
>>>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>>>>>> >> ... 9 more
>>>>>> >>
>>>>>> >>
>>>>>> >> *Source Code:*
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >> *StreamTableEnvironment tableEnv =
>>>>>> >> TableEnvironment.getTableEnvironment(env);
>>>>>> >> tableEnv.registerFunction("mytime", new MyTime(10));
>>>>>> tableEnv.connect(new
>>>>>> >> Kafka().version("0.10").topic("testin") .properties(kConsumer)
>>>>>> >> .startFromLatest()) .withFormat(new
>>>>>> >>
>>>>>> Json().jsonSchema(schemaContent).failOnMissingField(false).deriveSchema())
>>>>>> >> .withSchema(new Schema() .field("InterceptID", "DECIMAL")
>>>>>> >> .field("Provider_Info", "VARCHAR") .field("LIID", "VARCHAR")
>>>>>> >> .field("TiggerID", "DECIMAL") .field("ts", Types.SQL_TIMESTAMP())
>>>>>> >> .rowtime(new
>>>>>> >> Rowtime().timestampsFromSource().watermarksPeriodicBounded(1000)))
>>>>>> >> .inAppendMode() .registerTableSource(sourceTable); // WindowedTable
>>>>>> >> windowedTable = //
>>>>>> >> tableEnv.scan(sourceTable).window(Tumble.over("50.minutes"));
>>>>>> >> //tableEnv.sqlQuery(query) StringBuilder multi = new
>>>>>> StringBuilder();
>>>>>> >> for(String sql : rules) {     if(multi.length() > 0) {
>>>>>> multi.append("
>>>>>> >> UNION
>>>>>> >> ").append("\n");     }     multi.append( sql);      }
>>>>>> >> LOGGER.info("********************************* " +
>>>>>> multi.toString());
>>>>>> >> Table
>>>>>> >> result = tableEnv.sqlQuery(multi.toString()); tableEnv // declare
>>>>>> the
>>>>>> >> external system to connect to .connect(new Kafka().version("0.10")
>>>>>> >> .topic("testout").startFromEarliest() .properties(kProducer) )
>>>>>> >> .withFormat(new Json().failOnMissingField(false).deriveSchema())
>>>>>> >> .withSchema(new Schema() .field("TiggerID", Types.DECIMAL())
>>>>>> >> .field("RuleName", Types.STRING()) .field("ts1", Types.STRING())
>>>>>> >> .field("ts2", Types.STRING()) ) // specify the update-mode for
>>>>>> streaming
>>>>>> >> tables .inAppendMode() // register as source, sink, or both and
>>>>>> under a
>>>>>> >> name .registerTableSourceAndSink("ruleTable");
>>>>>> //tableEnv.sqlUpdate(
>>>>>> >> "INSERT INTO ruleTable " + result); result.insertInto("ruleTable");
>>>>>> >> Cheers,Dhanuka*
>>>>>> >>
>>>>>> >>
>>>>>> >>
>>>>>> >> --
>>>>>> >> Nothing Impossible,Creativity is more important than knowledge.
>>>>>> >>
>>>>>> >
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Nothing Impossible,Creativity is more important than knowledge.
>>>>
>>>
>>>
>>> --
>>> Nothing Impossible,Creativity is more important than knowledge.
>>>
>>
>>