GenericType<org.apache.commons.net.ntp.TimeStamp>; Actual: LocalDateTime

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

GenericType<org.apache.commons.net.ntp.TimeStamp>; Actual: LocalDateTime

邵志鹏
Dear:
Why no stable common examples using flink 1.9.3 about streaming sql with rowtime...


Thanks for helping.


flink1.9.3
root
 |-- error_code: STRING
 |-- window_start: TIMESTAMP(3) *ROWTIME*
 |-- window_end: TIMESTAMP(3) *ROWTIME*
 |-- total_num: BIGINT NOT NULL


POJO's type is TIMESTAMP in flink1.7.2 was right!


Exception in thread "main" org.apache.flink.table.api.TableException: Result field 'window_start' does not match requested type. Requested: GenericType<org.apache.commons.net.ntp.TimeStamp>; Actual: LocalDateTime


0. https://stackoverflow.com/questions/54339315/org-apache-flink-table-api-tableexception-arity-3-of-result-does-not-match-th


1. table_hop.select("window_start.cast(TIMESTAMP(3).bridgedTo(Timestamp.class))");
Exception in thread "main" org.apache.flink.table.api.ValidationException: Undefined function: bridgedTo
at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$visit$0(LookupCallResolver.java:46)
at java.util.Optional.orElseThrow(Optional.java:290)
at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:46)
at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:35)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66)
at org.apache.flink.table.expressions.resolver.LookupCallResolver.lambda$resolveChildren$1(LookupCallResolver.java:61)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1382)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.table.expressions.resolver.LookupCallResolver.resolveChildren(LookupCallResolver.java:62)
at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:51)
at org.apache.flink.table.expressions.resolver.LookupCallResolver.visit(LookupCallResolver.java:35)
at org.apache.flink.table.expressions.ApiExpressionVisitor.visit(ApiExpressionVisitor.java:37)
at org.apache.flink.table.expressions.LookupCallExpression.accept(LookupCallExpression.java:66)
at org.apache.flink.table.api.internal.TableImpl.lambda$select$0(TableImpl.java:123)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
at org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:124)
at org.apache.flink.table.api.internal.TableImpl.select(TableImpl.java:117)


2. another problem
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings);
Table table_hop = tableEnv.sqlQuery(query_hop);
Table t = table_hop.select("error_code");

t.printSchema();
DataStream<String> dsRow = tableEnv.toAppendStream(t, String.class);
dsRow.print();
root
 |-- error_code: STRING


/* 1 */

/* 2 */      public class SinkConversion$19 extends org.apache.flink.table.runtime.operators.TableStreamOperator

/* 3 */          implements org.apache.flink.streaming.api.operators.OneInputStreamOperator {

/* 4 */

/* 5 */        private final Object[] references;

/* 6 */        private transient org.apache.flink.table.dataformat.DataFormatConverters.StringConverter converter$18;

/* 7 */        private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);

/* 8 */

/* 9 */        public SinkConversion$19(

/* 10 */            Object[] references,

/* 11 */            org.apache.flink.streaming.runtime.tasks.StreamTask task,

/* 12 */            org.apache.flink.streaming.api.graph.StreamConfig config,

/* 13 */            org.apache.flink.streaming.api.operators.Output output) throws Exception {

/* 14 */          this.references = references;

/* 15 */          converter$18 = (((org.apache.flink.table.dataformat.DataFormatConverters.StringConverter) references[0]));

/* 16 */          this.setup(task, config, output);

/* 17 */        }

/* 18 */

/* 19 */        @Override

/* 20 */        public void open() throws Exception {

/* 21 */          super.open();

/* 22 */          

/* 23 */        }

/* 24 */

/* 25 */        @Override

/* 26 */        public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception {

/* 27 */          org.apache.flink.table.dataformat.BaseRow in1 = (org.apache.flink.table.dataformat.BaseRow) element.getValue();

/* 28 */          

/* 29 */          

/* 30 */          

/* 31 */          

/* 32 */          

/* 33 */          output.collect(outElement.replace((java.lang.String) converter$18.toExternal((org.apache.flink.table.dataformat.BinaryString) in1)));

/* 34 */          

/* 35 */        }

/* 36 */

/* 37 */        

/* 38 */

/* 39 */        @Override

/* 40 */        public void close() throws Exception {

/* 41 */           super.close();

/* 42 */          

/* 43 */        }

/* 44 */

/* 45 */        

/* 46 */      }

/* 47 */    




Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.

at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)

at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:631)

at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:117)

at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1507)

at com.fosun.bigdata.monitor.streaming.cep.SingleTraPayInfoReplenishJob.main(SingleTraPayInfoReplenishJob.java:180)

Caused by: java.lang.RuntimeException: Could not instantiate generated class 'SinkConversion$19'

at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67)

at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:47)

at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:428)

at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:354)

at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:144)

at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:393)

at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)

at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)

at java.lang.Thread.run(Thread.java:748)

Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue.

at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)

at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:65)

at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)

at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65)

... 8 more

Caused by: org.codehaus.commons.compiler.CompileException: Line 33, Column 140: Cannot cast "org.apache.flink.table.dataformat.BaseRow" to "org.apache.flink.table.dataformat.BinaryString"

at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)

at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5049)

at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)

at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416)

at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394)

at org.codehaus.janino.Java$Cast.accept(Java.java:4887)

at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)

at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)

at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5165)

at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)

at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)

at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)

at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)

at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)

at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)

at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5019)

at org.codehaus.janino.UnitCompiler.access$8600(UnitCompiler.java:215)

at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4416)

at org.codehaus.janino.UnitCompiler$16.visitCast(UnitCompiler.java:4394)

at org.codehaus.janino.Java$Cast.accept(Java.java:4887)

at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)

at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)

at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5165)

at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)

at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)

at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)

at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)

at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)

at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)

at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5165)

at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215)

at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421)

at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394)

at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)

at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394)

at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575)

at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781)

at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215)

at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760)

at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732)

at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062)

at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732)

at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360)

at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215)

at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494)

at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487)

at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871)

at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487)

at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567)

at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388)

at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357)

at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330)

at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822)

at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)

at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)

at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)

at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)

at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)

at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)

at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)

at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)

at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)

at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)

at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)

at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)

at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)

at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)

... 11 more