Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

杜斌
Hi,
Need help on this issue, here is what Flink reported when I enable the
checkpoint setting of the StreamExecutionEnvironment:

/* 1 */
/* 2 */      public class SourceConversion$1 extends
org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
/* 3 */          implements
org.apache.flink.streaming.api.operators.OneInputStreamOperator {
/* 4 */
/* 5 */        private final Object[] references;
/* 6 */        private transient
org.apache.flink.table.dataformat.DataFormatConverters.RowConverter
converter$0;
/* 7 */        private final
org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
/* 8 */
/* 9 */        public SourceConversion$1(
/* 10 */            Object[] references,
/* 11 */            org.apache.flink.streaming.runtime.tasks.StreamTask
task,
/* 12 */            org.apache.flink.streaming.api.graph.StreamConfig
config,
/* 13 */            org.apache.flink.streaming.api.operators.Output output)
throws Exception {
/* 14 */          this.references = references;
/* 15 */          converter$0 =
(((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter)
references[0]));
/* 16 */          this.setup(task, config, output);
/* 17 */        }
/* 18 */
/* 19 */        @Override
/* 20 */        public void open() throws Exception {
/* 21 */          super.open();
/* 22 */
/* 23 */        }
/* 24 */
/* 25 */        @Override
/* 26 */        public void
processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
element) throws Exception {
/* 27 */          org.apache.flink.table.dataformat.BaseRow in1 =
(org.apache.flink.table.dataformat.BaseRow)
(org.apache.flink.table.dataformat.BaseRow)
converter$0.toInternal((org.apache.flink.types.Row) element.getValue());
/* 28 */
/* 29 */
/* 30 */
/* 31 */          output.collect(outElement.replace(in1));
/* 32 */        }
/* 33 */
/* 34 */
/* 35 */
/* 36 */        @Override
/* 37 */        public void close() throws Exception {
/* 38 */           super.close();
/* 39 */
/* 40 */        }
/* 41 */
/* 42 */
/* 43 */      }
/* 44 */

Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
org.apache.flink.api.common.InvalidProgramException: Table program cannot
be compiled. This is a bug. Please file an issue.
at
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
at
org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
at
org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
at
org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
at
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
at
org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
at
org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
at
org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
at
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85)

The janino will compile successfully when I remove the checkpoint setting
of the env.

Can anyone help on this?

Thanks,
Bin
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

杜斌
add the full stack trace here:


Caused by:
org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
org.apache.flink.api.common.InvalidProgramException: Table program cannot
be compiled. This is a bug. Please file an issue.
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
at
org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
... 14 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Table
program cannot be compiled. This is a bug. Please file an issue.
at
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
at
org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
at
org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
... 17 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 2, Column
46: Cannot determine simple type name "org"
at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
at
org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886)
at org.codehaus.janino.IClass.getSuperclass(IClass.java:455)
at org.codehaus.janino.IClass.getIMethods(IClass.java:260)
at org.codehaus.janino.IClass.getIMethods(IClass.java:237)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492)
at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
at
org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
at
org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
at
org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
at
org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
... 23 more

杜斌 <[hidden email]> 于2020年6月17日周三 上午10:29写道:

> Hi,
> Need help on this issue, here is what Flink reported when I enable the
> checkpoint setting of the StreamExecutionEnvironment:
>
> /* 1 */
> /* 2 */      public class SourceConversion$1 extends
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> /* 3 */          implements
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> /* 4 */
> /* 5 */        private final Object[] references;
> /* 6 */        private transient
> org.apache.flink.table.dataformat.DataFormatConverters.RowConverter
> converter$0;
> /* 7 */        private final
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
> new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> /* 8 */
> /* 9 */        public SourceConversion$1(
> /* 10 */            Object[] references,
> /* 11 */            org.apache.flink.streaming.runtime.tasks.StreamTask
> task,
> /* 12 */            org.apache.flink.streaming.api.graph.StreamConfig
> config,
> /* 13 */            org.apache.flink.streaming.api.operators.Output
> output) throws Exception {
> /* 14 */          this.references = references;
> /* 15 */          converter$0 =
> (((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter)
> references[0]));
> /* 16 */          this.setup(task, config, output);
> /* 17 */        }
> /* 18 */
> /* 19 */        @Override
> /* 20 */        public void open() throws Exception {
> /* 21 */          super.open();
> /* 22 */
> /* 23 */        }
> /* 24 */
> /* 25 */        @Override
> /* 26 */        public void
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> element) throws Exception {
> /* 27 */          org.apache.flink.table.dataformat.BaseRow in1 =
> (org.apache.flink.table.dataformat.BaseRow)
> (org.apache.flink.table.dataformat.BaseRow)
> converter$0.toInternal((org.apache.flink.types.Row) element.getValue());
> /* 28 */
> /* 29 */
> /* 30 */
> /* 31 */          output.collect(outElement.replace(in1));
> /* 32 */        }
> /* 33 */
> /* 34 */
> /* 35 */
> /* 36 */        @Override
> /* 37 */        public void close() throws Exception {
> /* 38 */           super.close();
> /* 39 */
> /* 40 */        }
> /* 41 */
> /* 42 */
> /* 43 */      }
> /* 44 */
>
> Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> at
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> at
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> at
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> at
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> at
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> at
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> at
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> at
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85)
>
> The janino will compile successfully when I remove the checkpoint setting
> of the env.
>
> Can anyone help on this?
>
> Thanks,
> Bin
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

Jark Wu-2
Hi,

Which Flink version are you using? Are you using SQL CLI? Could you share
your table/sql program?
We did fix some classloading problems around SQL CLI, e.g. FLINK-18302

Best,
Jark

On Wed, 17 Jun 2020 at 10:31, 杜斌 <[hidden email]> wrote:

> add the full stack trace here:
>
>
> Caused by:
>
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> org.apache.flink.api.common.InvalidProgramException: Table program cannot
> be compiled. This is a bug. Please file an issue.
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> at
>
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> ... 14 more
> Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> program cannot be compiled. This is a bug. Please file an issue.
> at
>
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> at
>
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> at
>
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> ... 17 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 2, Column
> 46: Cannot determine simple type name "org"
> at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> at
>
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> at
> org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886)
> at org.codehaus.janino.IClass.getSuperclass(IClass.java:455)
> at org.codehaus.janino.IClass.getIMethods(IClass.java:260)
> at org.codehaus.janino.IClass.getIMethods(IClass.java:237)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492)
> at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> at
>
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> at
>
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> at
>
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> at
>
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> at
>
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> ... 23 more
>
> 杜斌 <[hidden email]> 于2020年6月17日周三 上午10:29写道:
>
> > Hi,
> > Need help on this issue, here is what Flink reported when I enable the
> > checkpoint setting of the StreamExecutionEnvironment:
> >
> > /* 1 */
> > /* 2 */      public class SourceConversion$1 extends
> > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> > /* 3 */          implements
> > org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> > /* 4 */
> > /* 5 */        private final Object[] references;
> > /* 6 */        private transient
> > org.apache.flink.table.dataformat.DataFormatConverters.RowConverter
> > converter$0;
> > /* 7 */        private final
> > org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement =
> > new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> > /* 8 */
> > /* 9 */        public SourceConversion$1(
> > /* 10 */            Object[] references,
> > /* 11 */            org.apache.flink.streaming.runtime.tasks.StreamTask
> > task,
> > /* 12 */            org.apache.flink.streaming.api.graph.StreamConfig
> > config,
> > /* 13 */            org.apache.flink.streaming.api.operators.Output
> > output) throws Exception {
> > /* 14 */          this.references = references;
> > /* 15 */          converter$0 =
> > (((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter)
> > references[0]));
> > /* 16 */          this.setup(task, config, output);
> > /* 17 */        }
> > /* 18 */
> > /* 19 */        @Override
> > /* 20 */        public void open() throws Exception {
> > /* 21 */          super.open();
> > /* 22 */
> > /* 23 */        }
> > /* 24 */
> > /* 25 */        @Override
> > /* 26 */        public void
> >
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > element) throws Exception {
> > /* 27 */          org.apache.flink.table.dataformat.BaseRow in1 =
> > (org.apache.flink.table.dataformat.BaseRow)
> > (org.apache.flink.table.dataformat.BaseRow)
> > converter$0.toInternal((org.apache.flink.types.Row) element.getValue());
> > /* 28 */
> > /* 29 */
> > /* 30 */
> > /* 31 */          output.collect(outElement.replace(in1));
> > /* 32 */        }
> > /* 33 */
> > /* 34 */
> > /* 35 */
> > /* 36 */        @Override
> > /* 37 */        public void close() throws Exception {
> > /* 38 */           super.close();
> > /* 39 */
> > /* 40 */        }
> > /* 41 */
> > /* 42 */
> > /* 43 */      }
> > /* 44 */
> >
> > Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
> > org.apache.flink.api.common.InvalidProgramException: Table program cannot
> > be compiled. This is a bug. Please file an issue.
> > at
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> > at
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> > at
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> > at
> >
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> > at
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> > at
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> > at
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> > at
> >
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> > at
> >
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> > at
> >
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> > at
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> > at
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85)
> >
> > The janino will compile successfully when I remove the checkpoint setting
> > of the env.
> >
> > Can anyone help on this?
> >
> > Thanks,
> > Bin
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

杜斌
Thanks for the reply,
Here is the simple java program that re-produce the problem:
1. code for the application:

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.types.Row;


import java.util.Arrays;

public class Test {
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
        /**
         * If enable checkpoint, blink planner will failed
         */
        env.enableCheckpointing(1000);

        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
//                .useBlinkPlanner() // compile fail
                .useOldPlanner() // compile success
                .inStreamingMode()
                .build();
        StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, envSettings);
        DataStream<Order> orderA = env.fromCollection(Arrays.asList(
                new Order(1L, "beer", 3),
                new Order(1L, "diaper", 4),
                new Order(3L, "beer", 2)
        ));

//        Table table = tEnv.fromDataStream(orderA);
        tEnv.createTemporaryView("orderA", orderA);
        Table res = tEnv.sqlQuery("SELECT * FROM orderA");
        DataStream<Tuple2<Boolean, Row>> ds =
tEnv.toRetractStream(res, Row.class);
        ds.print();
        env.execute();

    }

    public static class Order {
        public long user;
        public String product;
        public int amount;

        public Order(long user, String product, int amount) {
            this.user = user;
            this.product = product;
            this.amount = amount;
        }

        public long getUser() {
            return user;
        }

        public void setUser(long user) {
            this.user = user;
        }

        public String getProduct() {
            return product;
        }

        public void setProduct(String product) {
            this.product = product;
        }

        public int getAmount() {
            return amount;
        }

        public void setAmount(int amount) {
            this.amount = amount;
        }
    }
}

2. mvn clean package to a jar file
3. then we use the following code to produce a jobgraph:

PackagedProgram program =
        PackagedProgram.newBuilder()
                .setJarFile(userJarPath)
                .setUserClassPaths(classpaths)
                .setEntryPointClassName(userMainClass)
                .setConfiguration(configuration)

.setSavepointRestoreSettings((descriptor.isRecoverFromSavepoint() &&
descriptor.getSavepointPath() != null &&
!descriptor.getSavepointPath().equals("")) ?

SavepointRestoreSettings.forPath(descriptor.getSavepointPath(),
descriptor.isAllowNonRestoredState()) :
SavepointRestoreSettings.none())
                .setArguments(userArgs)
                .build();


JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program,
configuration, 4, true);

4. If we use blink planner & enable checkpoint, the compile will failed.
For the others, the compile success.

Thanks,
Bin

Jark Wu <[hidden email]> 于2020年6月17日周三 上午10:42写道:

> Hi,
>
> Which Flink version are you using? Are you using SQL CLI? Could you share
> your table/sql program?
> We did fix some classloading problems around SQL CLI, e.g. FLINK-18302
>
> Best,
> Jark
>
> On Wed, 17 Jun 2020 at 10:31, 杜斌 <[hidden email]> wrote:
>
> > add the full stack trace here:
> >
> >
> > Caused by:
> >
> >
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> > org.apache.flink.api.common.InvalidProgramException: Table program cannot
> > be compiled. This is a bug. Please file an issue.
> > at
> >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> > at
> >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> > at
> >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> > at
> >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> > ... 14 more
> > Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> > program cannot be compiled. This is a bug. Please file an issue.
> > at
> >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> > at
> >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> > at
> >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> > at
> >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> > at
> >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> > at
> >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> > at
> >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> > ... 17 more
> > Caused by: org.codehaus.commons.compiler.CompileException: Line 2, Column
> > 46: Cannot determine simple type name "org"
> > at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> > at
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> > at
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> > at
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > at
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > at
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > at
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > at
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > at
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> > at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> > at
> >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> > at
> >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> > at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> > at org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> > at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> > at
> >
> org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886)
> > at org.codehaus.janino.IClass.getSuperclass(IClass.java:455)
> > at org.codehaus.janino.IClass.getIMethods(IClass.java:260)
> > at org.codehaus.janino.IClass.getIMethods(IClass.java:237)
> > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492)
> > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> > at
> >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> > at
> >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> > at
> >
> >
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> > at
> >
> >
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> > at
> >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> > ... 23 more
> >
> > 杜斌 <[hidden email]> 于2020年6月17日周三 上午10:29写道:
> >
> > > Hi,
> > > Need help on this issue, here is what Flink reported when I enable the
> > > checkpoint setting of the StreamExecutionEnvironment:
> > >
> > > /* 1 */
> > > /* 2 */      public class SourceConversion$1 extends
> > > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> > > /* 3 */          implements
> > > org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> > > /* 4 */
> > > /* 5 */        private final Object[] references;
> > > /* 6 */        private transient
> > > org.apache.flink.table.dataformat.DataFormatConverters.RowConverter
> > > converter$0;
> > > /* 7 */        private final
> > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> outElement =
> > > new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> > > /* 8 */
> > > /* 9 */        public SourceConversion$1(
> > > /* 10 */            Object[] references,
> > > /* 11 */            org.apache.flink.streaming.runtime.tasks.StreamTask
> > > task,
> > > /* 12 */            org.apache.flink.streaming.api.graph.StreamConfig
> > > config,
> > > /* 13 */            org.apache.flink.streaming.api.operators.Output
> > > output) throws Exception {
> > > /* 14 */          this.references = references;
> > > /* 15 */          converter$0 =
> > > (((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter)
> > > references[0]));
> > > /* 16 */          this.setup(task, config, output);
> > > /* 17 */        }
> > > /* 18 */
> > > /* 19 */        @Override
> > > /* 20 */        public void open() throws Exception {
> > > /* 21 */          super.open();
> > > /* 22 */
> > > /* 23 */        }
> > > /* 24 */
> > > /* 25 */        @Override
> > > /* 26 */        public void
> > >
> >
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > element) throws Exception {
> > > /* 27 */          org.apache.flink.table.dataformat.BaseRow in1 =
> > > (org.apache.flink.table.dataformat.BaseRow)
> > > (org.apache.flink.table.dataformat.BaseRow)
> > > converter$0.toInternal((org.apache.flink.types.Row)
> element.getValue());
> > > /* 28 */
> > > /* 29 */
> > > /* 30 */
> > > /* 31 */          output.collect(outElement.replace(in1));
> > > /* 32 */        }
> > > /* 33 */
> > > /* 34 */
> > > /* 35 */
> > > /* 36 */        @Override
> > > /* 37 */        public void close() throws Exception {
> > > /* 38 */           super.close();
> > > /* 39 */
> > > /* 40 */        }
> > > /* 41 */
> > > /* 42 */
> > > /* 43 */      }
> > > /* 44 */
> > >
> > > Exception in thread "main" org.apache.flink.util.FlinkRuntimeException:
> > > org.apache.flink.api.common.InvalidProgramException: Table program
> cannot
> > > be compiled. This is a bug. Please file an issue.
> > > at
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> > > at
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> > > at
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> > > at
> > >
> >
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> > > at
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> > > at
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> > > at
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> > > at
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> > > at
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> > > at
> > >
> >
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> > > at
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> > > at
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85)
> > >
> > > The janino will compile successfully when I remove the checkpoint
> setting
> > > of the env.
> > >
> > > Can anyone help on this?
> > >
> > > Thanks,
> > > Bin
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

Jark Wu-2
Why compile JobGraph yourself? This is really an internal API and may cause
problems.
Could you try to use `flink run` command [1] to submit your user jar
instead?

Btw, what's your Flink version? If you are using Flink 1.10.0, could you
try to use 1.10.1?

Best,
Jark

On Wed, 17 Jun 2020 at 12:41, 杜斌 <[hidden email]> wrote:

> Thanks for the reply,
> Here is the simple java program that re-produce the problem:
> 1. code for the application:
>
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.Table;
> import org.apache.flink.table.api.java.StreamTableEnvironment;
> import org.apache.flink.types.Row;
>
>
> import java.util.Arrays;
>
> public class Test {
>     public static void main(String[] args) throws Exception {
>
>         StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         /**
>          * If enable checkpoint, blink planner will failed
>          */
>         env.enableCheckpointing(1000);
>
>         EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
> //                .useBlinkPlanner() // compile fail
>                 .useOldPlanner() // compile success
>                 .inStreamingMode()
>                 .build();
>         StreamTableEnvironment tEnv =
> StreamTableEnvironment.create(env, envSettings);
>         DataStream<Order> orderA = env.fromCollection(Arrays.asList(
>                 new Order(1L, "beer", 3),
>                 new Order(1L, "diaper", 4),
>                 new Order(3L, "beer", 2)
>         ));
>
> //        Table table = tEnv.fromDataStream(orderA);
>         tEnv.createTemporaryView("orderA", orderA);
>         Table res = tEnv.sqlQuery("SELECT * FROM orderA");
>         DataStream<Tuple2<Boolean, Row>> ds =
> tEnv.toRetractStream(res, Row.class);
>         ds.print();
>         env.execute();
>
>     }
>
>     public static class Order {
>         public long user;
>         public String product;
>         public int amount;
>
>         public Order(long user, String product, int amount) {
>             this.user = user;
>             this.product = product;
>             this.amount = amount;
>         }
>
>         public long getUser() {
>             return user;
>         }
>
>         public void setUser(long user) {
>             this.user = user;
>         }
>
>         public String getProduct() {
>             return product;
>         }
>
>         public void setProduct(String product) {
>             this.product = product;
>         }
>
>         public int getAmount() {
>             return amount;
>         }
>
>         public void setAmount(int amount) {
>             this.amount = amount;
>         }
>     }
> }
>
> 2. mvn clean package to a jar file
> 3. then we use the following code to produce a jobgraph:
>
> PackagedProgram program =
>         PackagedProgram.newBuilder()
>                 .setJarFile(userJarPath)
>                 .setUserClassPaths(classpaths)
>                 .setEntryPointClassName(userMainClass)
>                 .setConfiguration(configuration)
>
> .setSavepointRestoreSettings((descriptor.isRecoverFromSavepoint() &&
> descriptor.getSavepointPath() != null &&
> !descriptor.getSavepointPath().equals("")) ?
>
> SavepointRestoreSettings.forPath(descriptor.getSavepointPath(),
> descriptor.isAllowNonRestoredState()) :
> SavepointRestoreSettings.none())
>                 .setArguments(userArgs)
>                 .build();
>
>
> JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program,
> configuration, 4, true);
>
> 4. If we use blink planner & enable checkpoint, the compile will failed.
> For the others, the compile success.
>
> Thanks,
> Bin
>
> Jark Wu <[hidden email]> 于2020年6月17日周三 上午10:42写道:
>
> > Hi,
> >
> > Which Flink version are you using? Are you using SQL CLI? Could you share
> > your table/sql program?
> > We did fix some classloading problems around SQL CLI, e.g. FLINK-18302
> >
> > Best,
> > Jark
> >
> > On Wed, 17 Jun 2020 at 10:31, 杜斌 <[hidden email]> wrote:
> >
> > > add the full stack trace here:
> > >
> > >
> > > Caused by:
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> > > org.apache.flink.api.common.InvalidProgramException: Table program
> cannot
> > > be compiled. This is a bug. Please file an issue.
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> > > at
> > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> > > ... 14 more
> > > Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> > > program cannot be compiled. This is a bug. Please file an issue.
> > > at
> > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> > > at
> > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> > > at
> > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> > > ... 17 more
> > > Caused by: org.codehaus.commons.compiler.CompileException: Line 2,
> Column
> > > 46: Cannot determine simple type name "org"
> > > at
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > at
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> > > at org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> > > at
> > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> > > at
> > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> > > at
> org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> > > at
> org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> > > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> > > at org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> > > at
> > >
> >
> org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886)
> > > at org.codehaus.janino.IClass.getSuperclass(IClass.java:455)
> > > at org.codehaus.janino.IClass.getIMethods(IClass.java:260)
> > > at org.codehaus.janino.IClass.getIMethods(IClass.java:237)
> > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492)
> > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> > > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> > > at
> > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> > > at
> > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> > > at
> > >
> > >
> >
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> > > at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> > > at
> > >
> > >
> >
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> > > at
> > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> > > ... 23 more
> > >
> > > 杜斌 <[hidden email]> 于2020年6月17日周三 上午10:29写道:
> > >
> > > > Hi,
> > > > Need help on this issue, here is what Flink reported when I enable
> the
> > > > checkpoint setting of the StreamExecutionEnvironment:
> > > >
> > > > /* 1 */
> > > > /* 2 */      public class SourceConversion$1 extends
> > > >
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> > > > /* 3 */          implements
> > > > org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> > > > /* 4 */
> > > > /* 5 */        private final Object[] references;
> > > > /* 6 */        private transient
> > > > org.apache.flink.table.dataformat.DataFormatConverters.RowConverter
> > > > converter$0;
> > > > /* 7 */        private final
> > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > outElement =
> > > > new
> org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> > > > /* 8 */
> > > > /* 9 */        public SourceConversion$1(
> > > > /* 10 */            Object[] references,
> > > > /* 11 */
> org.apache.flink.streaming.runtime.tasks.StreamTask
> > > > task,
> > > > /* 12 */            org.apache.flink.streaming.api.graph.StreamConfig
> > > > config,
> > > > /* 13 */            org.apache.flink.streaming.api.operators.Output
> > > > output) throws Exception {
> > > > /* 14 */          this.references = references;
> > > > /* 15 */          converter$0 =
> > > >
> (((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter)
> > > > references[0]));
> > > > /* 16 */          this.setup(task, config, output);
> > > > /* 17 */        }
> > > > /* 18 */
> > > > /* 19 */        @Override
> > > > /* 20 */        public void open() throws Exception {
> > > > /* 21 */          super.open();
> > > > /* 22 */
> > > > /* 23 */        }
> > > > /* 24 */
> > > > /* 25 */        @Override
> > > > /* 26 */        public void
> > > >
> > >
> >
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > > element) throws Exception {
> > > > /* 27 */          org.apache.flink.table.dataformat.BaseRow in1 =
> > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > converter$0.toInternal((org.apache.flink.types.Row)
> > element.getValue());
> > > > /* 28 */
> > > > /* 29 */
> > > > /* 30 */
> > > > /* 31 */          output.collect(outElement.replace(in1));
> > > > /* 32 */        }
> > > > /* 33 */
> > > > /* 34 */
> > > > /* 35 */
> > > > /* 36 */        @Override
> > > > /* 37 */        public void close() throws Exception {
> > > > /* 38 */           super.close();
> > > > /* 39 */
> > > > /* 40 */        }
> > > > /* 41 */
> > > > /* 42 */
> > > > /* 43 */      }
> > > > /* 44 */
> > > >
> > > > Exception in thread "main"
> org.apache.flink.util.FlinkRuntimeException:
> > > > org.apache.flink.api.common.InvalidProgramException: Table program
> > cannot
> > > > be compiled. This is a bug. Please file an issue.
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> > > > at
> > > >
> > >
> >
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> > > > at
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> > > > at
> > > >
> > >
> >
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> > > > at
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> > > > at
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85)
> > > >
> > > > The janino will compile successfully when I remove the checkpoint
> > setting
> > > > of the env.
> > > >
> > > > Can anyone help on this?
> > > >
> > > > Thanks,
> > > > Bin
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

Fabian Hueske-2
The exception is thrown when the StreamGraph is translated to a JobGraph.
The translation logic has a switch. If checkpointing is enabled, the Java
code generated by the optimizer is directly compiled in the client (in
contrast to compiling it on the TaskManagers when the operators are
started).
The compiler needs access to the UDF classes but the classloader that's
being used doesn't know about the UDF classes.

The classloader that's used for the compilation is generated by
PackagedProgram.
You can configure the PackagedProgram with the right user code JAR URLs
which contain the UDF classes.
Alternatively, you can try to inject another classloader into
PackagedProgram using Reflection (but that's a rather hacky approach).

Hope this helps.

Cheers, Fabian

Am Mi., 17. Juni 2020 um 06:48 Uhr schrieb Jark Wu <[hidden email]>:

> Why compile JobGraph yourself? This is really an internal API and may cause
> problems.
> Could you try to use `flink run` command [1] to submit your user jar
> instead?
>
> Btw, what's your Flink version? If you are using Flink 1.10.0, could you
> try to use 1.10.1?
>
> Best,
> Jark
>
> On Wed, 17 Jun 2020 at 12:41, 杜斌 <[hidden email]> wrote:
>
> > Thanks for the reply,
> > Here is the simple java program that re-produce the problem:
> > 1. code for the application:
> >
> > import org.apache.flink.api.java.tuple.Tuple2;
> > import org.apache.flink.streaming.api.datastream.DataStream;
> > import
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import org.apache.flink.table.api.EnvironmentSettings;
> > import org.apache.flink.table.api.Table;
> > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > import org.apache.flink.types.Row;
> >
> >
> > import java.util.Arrays;
> >
> > public class Test {
> >     public static void main(String[] args) throws Exception {
> >
> >         StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >         /**
> >          * If enable checkpoint, blink planner will failed
> >          */
> >         env.enableCheckpointing(1000);
> >
> >         EnvironmentSettings envSettings =
> EnvironmentSettings.newInstance()
> > //                .useBlinkPlanner() // compile fail
> >                 .useOldPlanner() // compile success
> >                 .inStreamingMode()
> >                 .build();
> >         StreamTableEnvironment tEnv =
> > StreamTableEnvironment.create(env, envSettings);
> >         DataStream<Order> orderA = env.fromCollection(Arrays.asList(
> >                 new Order(1L, "beer", 3),
> >                 new Order(1L, "diaper", 4),
> >                 new Order(3L, "beer", 2)
> >         ));
> >
> > //        Table table = tEnv.fromDataStream(orderA);
> >         tEnv.createTemporaryView("orderA", orderA);
> >         Table res = tEnv.sqlQuery("SELECT * FROM orderA");
> >         DataStream<Tuple2<Boolean, Row>> ds =
> > tEnv.toRetractStream(res, Row.class);
> >         ds.print();
> >         env.execute();
> >
> >     }
> >
> >     public static class Order {
> >         public long user;
> >         public String product;
> >         public int amount;
> >
> >         public Order(long user, String product, int amount) {
> >             this.user = user;
> >             this.product = product;
> >             this.amount = amount;
> >         }
> >
> >         public long getUser() {
> >             return user;
> >         }
> >
> >         public void setUser(long user) {
> >             this.user = user;
> >         }
> >
> >         public String getProduct() {
> >             return product;
> >         }
> >
> >         public void setProduct(String product) {
> >             this.product = product;
> >         }
> >
> >         public int getAmount() {
> >             return amount;
> >         }
> >
> >         public void setAmount(int amount) {
> >             this.amount = amount;
> >         }
> >     }
> > }
> >
> > 2. mvn clean package to a jar file
> > 3. then we use the following code to produce a jobgraph:
> >
> > PackagedProgram program =
> >         PackagedProgram.newBuilder()
> >                 .setJarFile(userJarPath)
> >                 .setUserClassPaths(classpaths)
> >                 .setEntryPointClassName(userMainClass)
> >                 .setConfiguration(configuration)
> >
> > .setSavepointRestoreSettings((descriptor.isRecoverFromSavepoint() &&
> > descriptor.getSavepointPath() != null &&
> > !descriptor.getSavepointPath().equals("")) ?
> >
> > SavepointRestoreSettings.forPath(descriptor.getSavepointPath(),
> > descriptor.isAllowNonRestoredState()) :
> > SavepointRestoreSettings.none())
> >                 .setArguments(userArgs)
> >                 .build();
> >
> >
> > JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program,
> > configuration, 4, true);
> >
> > 4. If we use blink planner & enable checkpoint, the compile will failed.
> > For the others, the compile success.
> >
> > Thanks,
> > Bin
> >
> > Jark Wu <[hidden email]> 于2020年6月17日周三 上午10:42写道:
> >
> > > Hi,
> > >
> > > Which Flink version are you using? Are you using SQL CLI? Could you
> share
> > > your table/sql program?
> > > We did fix some classloading problems around SQL CLI, e.g. FLINK-18302
> > >
> > > Best,
> > > Jark
> > >
> > > On Wed, 17 Jun 2020 at 10:31, 杜斌 <[hidden email]> wrote:
> > >
> > > > add the full stack trace here:
> > > >
> > > >
> > > > Caused by:
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> > > > org.apache.flink.api.common.InvalidProgramException: Table program
> > cannot
> > > > be compiled. This is a bug. Please file an issue.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> > > > ... 14 more
> > > > Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> > > > program cannot be compiled. This is a bug. Please file an issue.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> > > > ... 17 more
> > > > Caused by: org.codehaus.commons.compiler.CompileException: Line 2,
> > Column
> > > > 46: Cannot determine simple type name "org"
> > > > at
> > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> > > > at
> org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> > > > at
> > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> > > > at
> > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> > > > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> > > > at
> org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> > > > at
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886)
> > > > at org.codehaus.janino.IClass.getSuperclass(IClass.java:455)
> > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:260)
> > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:237)
> > > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492)
> > > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> > > > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> > > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> > > > at
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> > > > ... 23 more
> > > >
> > > > 杜斌 <[hidden email]> 于2020年6月17日周三 上午10:29写道:
> > > >
> > > > > Hi,
> > > > > Need help on this issue, here is what Flink reported when I enable
> > the
> > > > > checkpoint setting of the StreamExecutionEnvironment:
> > > > >
> > > > > /* 1 */
> > > > > /* 2 */      public class SourceConversion$1 extends
> > > > >
> > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> > > > > /* 3 */          implements
> > > > > org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> > > > > /* 4 */
> > > > > /* 5 */        private final Object[] references;
> > > > > /* 6 */        private transient
> > > > > org.apache.flink.table.dataformat.DataFormatConverters.RowConverter
> > > > > converter$0;
> > > > > /* 7 */        private final
> > > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > outElement =
> > > > > new
> > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> > > > > /* 8 */
> > > > > /* 9 */        public SourceConversion$1(
> > > > > /* 10 */            Object[] references,
> > > > > /* 11 */
> > org.apache.flink.streaming.runtime.tasks.StreamTask
> > > > > task,
> > > > > /* 12 */
> org.apache.flink.streaming.api.graph.StreamConfig
> > > > > config,
> > > > > /* 13 */            org.apache.flink.streaming.api.operators.Output
> > > > > output) throws Exception {
> > > > > /* 14 */          this.references = references;
> > > > > /* 15 */          converter$0 =
> > > > >
> > (((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter)
> > > > > references[0]));
> > > > > /* 16 */          this.setup(task, config, output);
> > > > > /* 17 */        }
> > > > > /* 18 */
> > > > > /* 19 */        @Override
> > > > > /* 20 */        public void open() throws Exception {
> > > > > /* 21 */          super.open();
> > > > > /* 22 */
> > > > > /* 23 */        }
> > > > > /* 24 */
> > > > > /* 25 */        @Override
> > > > > /* 26 */        public void
> > > > >
> > > >
> > >
> >
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > > > element) throws Exception {
> > > > > /* 27 */          org.apache.flink.table.dataformat.BaseRow in1 =
> > > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > > converter$0.toInternal((org.apache.flink.types.Row)
> > > element.getValue());
> > > > > /* 28 */
> > > > > /* 29 */
> > > > > /* 30 */
> > > > > /* 31 */          output.collect(outElement.replace(in1));
> > > > > /* 32 */        }
> > > > > /* 33 */
> > > > > /* 34 */
> > > > > /* 35 */
> > > > > /* 36 */        @Override
> > > > > /* 37 */        public void close() throws Exception {
> > > > > /* 38 */           super.close();
> > > > > /* 39 */
> > > > > /* 40 */        }
> > > > > /* 41 */
> > > > > /* 42 */
> > > > > /* 43 */      }
> > > > > /* 44 */
> > > > >
> > > > > Exception in thread "main"
> > org.apache.flink.util.FlinkRuntimeException:
> > > > > org.apache.flink.api.common.InvalidProgramException: Table program
> > > cannot
> > > > > be compiled. This is a bug. Please file an issue.
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85)
> > > > >
> > > > > The janino will compile successfully when I remove the checkpoint
> > > setting
> > > > > of the env.
> > > > >
> > > > > Can anyone help on this?
> > > > >
> > > > > Thanks,
> > > > > Bin
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

杜斌
In reply to this post by Jark Wu-2
I use Flink 1.10.0. After I updated the version to 1.10.1, the code is
fully functional. Big Thanks!!
We compile JobGraph mainly for building up a streaming service, and get
some meta data of the job, save the meta data, do some monitoring on the
jobs and so on.

Thanks,
Bin

Jark Wu <[hidden email]> 于2020年6月17日周三 下午12:48写道:

> Why compile JobGraph yourself? This is really an internal API and may cause
> problems.
> Could you try to use `flink run` command [1] to submit your user jar
> instead?
>
> Btw, what's your Flink version? If you are using Flink 1.10.0, could you
> try to use 1.10.1?
>
> Best,
> Jark
>
> On Wed, 17 Jun 2020 at 12:41, 杜斌 <[hidden email]> wrote:
>
> > Thanks for the reply,
> > Here is the simple java program that re-produce the problem:
> > 1. code for the application:
> >
> > import org.apache.flink.api.java.tuple.Tuple2;
> > import org.apache.flink.streaming.api.datastream.DataStream;
> > import
> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > import org.apache.flink.table.api.EnvironmentSettings;
> > import org.apache.flink.table.api.Table;
> > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > import org.apache.flink.types.Row;
> >
> >
> > import java.util.Arrays;
> >
> > public class Test {
> >     public static void main(String[] args) throws Exception {
> >
> >         StreamExecutionEnvironment env =
> > StreamExecutionEnvironment.getExecutionEnvironment();
> >         /**
> >          * If enable checkpoint, blink planner will failed
> >          */
> >         env.enableCheckpointing(1000);
> >
> >         EnvironmentSettings envSettings =
> EnvironmentSettings.newInstance()
> > //                .useBlinkPlanner() // compile fail
> >                 .useOldPlanner() // compile success
> >                 .inStreamingMode()
> >                 .build();
> >         StreamTableEnvironment tEnv =
> > StreamTableEnvironment.create(env, envSettings);
> >         DataStream<Order> orderA = env.fromCollection(Arrays.asList(
> >                 new Order(1L, "beer", 3),
> >                 new Order(1L, "diaper", 4),
> >                 new Order(3L, "beer", 2)
> >         ));
> >
> > //        Table table = tEnv.fromDataStream(orderA);
> >         tEnv.createTemporaryView("orderA", orderA);
> >         Table res = tEnv.sqlQuery("SELECT * FROM orderA");
> >         DataStream<Tuple2<Boolean, Row>> ds =
> > tEnv.toRetractStream(res, Row.class);
> >         ds.print();
> >         env.execute();
> >
> >     }
> >
> >     public static class Order {
> >         public long user;
> >         public String product;
> >         public int amount;
> >
> >         public Order(long user, String product, int amount) {
> >             this.user = user;
> >             this.product = product;
> >             this.amount = amount;
> >         }
> >
> >         public long getUser() {
> >             return user;
> >         }
> >
> >         public void setUser(long user) {
> >             this.user = user;
> >         }
> >
> >         public String getProduct() {
> >             return product;
> >         }
> >
> >         public void setProduct(String product) {
> >             this.product = product;
> >         }
> >
> >         public int getAmount() {
> >             return amount;
> >         }
> >
> >         public void setAmount(int amount) {
> >             this.amount = amount;
> >         }
> >     }
> > }
> >
> > 2. mvn clean package to a jar file
> > 3. then we use the following code to produce a jobgraph:
> >
> > PackagedProgram program =
> >         PackagedProgram.newBuilder()
> >                 .setJarFile(userJarPath)
> >                 .setUserClassPaths(classpaths)
> >                 .setEntryPointClassName(userMainClass)
> >                 .setConfiguration(configuration)
> >
> > .setSavepointRestoreSettings((descriptor.isRecoverFromSavepoint() &&
> > descriptor.getSavepointPath() != null &&
> > !descriptor.getSavepointPath().equals("")) ?
> >
> > SavepointRestoreSettings.forPath(descriptor.getSavepointPath(),
> > descriptor.isAllowNonRestoredState()) :
> > SavepointRestoreSettings.none())
> >                 .setArguments(userArgs)
> >                 .build();
> >
> >
> > JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program,
> > configuration, 4, true);
> >
> > 4. If we use blink planner & enable checkpoint, the compile will failed.
> > For the others, the compile success.
> >
> > Thanks,
> > Bin
> >
> > Jark Wu <[hidden email]> 于2020年6月17日周三 上午10:42写道:
> >
> > > Hi,
> > >
> > > Which Flink version are you using? Are you using SQL CLI? Could you
> share
> > > your table/sql program?
> > > We did fix some classloading problems around SQL CLI, e.g. FLINK-18302
> > >
> > > Best,
> > > Jark
> > >
> > > On Wed, 17 Jun 2020 at 10:31, 杜斌 <[hidden email]> wrote:
> > >
> > > > add the full stack trace here:
> > > >
> > > >
> > > > Caused by:
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> > > > org.apache.flink.api.common.InvalidProgramException: Table program
> > cannot
> > > > be compiled. This is a bug. Please file an issue.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> > > > ... 14 more
> > > > Caused by: org.apache.flink.api.common.InvalidProgramException: Table
> > > > program cannot be compiled. This is a bug. Please file an issue.
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> > > > ... 17 more
> > > > Caused by: org.codehaus.commons.compiler.CompileException: Line 2,
> > Column
> > > > 46: Cannot determine simple type name "org"
> > > > at
> > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > at org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> > > > at
> org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> > > > at
> > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> > > > at
> > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> > > > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> > > > at
> org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> > > > at
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886)
> > > > at org.codehaus.janino.IClass.getSuperclass(IClass.java:455)
> > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:260)
> > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:237)
> > > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492)
> > > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> > > > at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> > > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> > > > at
> org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> > > > at
> > > >
> > > >
> > >
> >
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> > > > at
> > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> > > > ... 23 more
> > > >
> > > > 杜斌 <[hidden email]> 于2020年6月17日周三 上午10:29写道:
> > > >
> > > > > Hi,
> > > > > Need help on this issue, here is what Flink reported when I enable
> > the
> > > > > checkpoint setting of the StreamExecutionEnvironment:
> > > > >
> > > > > /* 1 */
> > > > > /* 2 */      public class SourceConversion$1 extends
> > > > >
> > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> > > > > /* 3 */          implements
> > > > > org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> > > > > /* 4 */
> > > > > /* 5 */        private final Object[] references;
> > > > > /* 6 */        private transient
> > > > > org.apache.flink.table.dataformat.DataFormatConverters.RowConverter
> > > > > converter$0;
> > > > > /* 7 */        private final
> > > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > outElement =
> > > > > new
> > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> > > > > /* 8 */
> > > > > /* 9 */        public SourceConversion$1(
> > > > > /* 10 */            Object[] references,
> > > > > /* 11 */
> > org.apache.flink.streaming.runtime.tasks.StreamTask
> > > > > task,
> > > > > /* 12 */
> org.apache.flink.streaming.api.graph.StreamConfig
> > > > > config,
> > > > > /* 13 */            org.apache.flink.streaming.api.operators.Output
> > > > > output) throws Exception {
> > > > > /* 14 */          this.references = references;
> > > > > /* 15 */          converter$0 =
> > > > >
> > (((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter)
> > > > > references[0]));
> > > > > /* 16 */          this.setup(task, config, output);
> > > > > /* 17 */        }
> > > > > /* 18 */
> > > > > /* 19 */        @Override
> > > > > /* 20 */        public void open() throws Exception {
> > > > > /* 21 */          super.open();
> > > > > /* 22 */
> > > > > /* 23 */        }
> > > > > /* 24 */
> > > > > /* 25 */        @Override
> > > > > /* 26 */        public void
> > > > >
> > > >
> > >
> >
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > > > element) throws Exception {
> > > > > /* 27 */          org.apache.flink.table.dataformat.BaseRow in1 =
> > > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > > converter$0.toInternal((org.apache.flink.types.Row)
> > > element.getValue());
> > > > > /* 28 */
> > > > > /* 29 */
> > > > > /* 30 */
> > > > > /* 31 */          output.collect(outElement.replace(in1));
> > > > > /* 32 */        }
> > > > > /* 33 */
> > > > > /* 34 */
> > > > > /* 35 */
> > > > > /* 36 */        @Override
> > > > > /* 37 */        public void close() throws Exception {
> > > > > /* 38 */           super.close();
> > > > > /* 39 */
> > > > > /* 40 */        }
> > > > > /* 41 */
> > > > > /* 42 */
> > > > > /* 43 */      }
> > > > > /* 44 */
> > > > >
> > > > > Exception in thread "main"
> > org.apache.flink.util.FlinkRuntimeException:
> > > > > org.apache.flink.api.common.InvalidProgramException: Table program
> > > cannot
> > > > > be compiled. This is a bug. Please file an issue.
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85)
> > > > >
> > > > > The janino will compile successfully when I remove the checkpoint
> > > setting
> > > > > of the env.
> > > > >
> > > > > Can anyone help on this?
> > > > >
> > > > > Thanks,
> > > > > Bin
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

杜斌
In reply to this post by Fabian Hueske-2
Sure, it helps a lot. After update to Flink 1.10.1, problem solved!

Thanks,
Bin

Fabian Hueske <[hidden email]> 于2020年6月17日周三 下午4:32写道:

> The exception is thrown when the StreamGraph is translated to a JobGraph.
> The translation logic has a switch. If checkpointing is enabled, the Java
> code generated by the optimizer is directly compiled in the client (in
> contrast to compiling it on the TaskManagers when the operators are
> started).
> The compiler needs access to the UDF classes but the classloader that's
> being used doesn't know about the UDF classes.
>
> The classloader that's used for the compilation is generated by
> PackagedProgram.
> You can configure the PackagedProgram with the right user code JAR URLs
> which contain the UDF classes.
> Alternatively, you can try to inject another classloader into
> PackagedProgram using Reflection (but that's a rather hacky approach).
>
> Hope this helps.
>
> Cheers, Fabian
>
> Am Mi., 17. Juni 2020 um 06:48 Uhr schrieb Jark Wu <[hidden email]>:
>
> > Why compile JobGraph yourself? This is really an internal API and may
> cause
> > problems.
> > Could you try to use `flink run` command [1] to submit your user jar
> > instead?
> >
> > Btw, what's your Flink version? If you are using Flink 1.10.0, could you
> > try to use 1.10.1?
> >
> > Best,
> > Jark
> >
> > On Wed, 17 Jun 2020 at 12:41, 杜斌 <[hidden email]> wrote:
> >
> > > Thanks for the reply,
> > > Here is the simple java program that re-produce the problem:
> > > 1. code for the application:
> > >
> > > import org.apache.flink.api.java.tuple.Tuple2;
> > > import org.apache.flink.streaming.api.datastream.DataStream;
> > > import
> > > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > > import org.apache.flink.table.api.EnvironmentSettings;
> > > import org.apache.flink.table.api.Table;
> > > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > > import org.apache.flink.types.Row;
> > >
> > >
> > > import java.util.Arrays;
> > >
> > > public class Test {
> > >     public static void main(String[] args) throws Exception {
> > >
> > >         StreamExecutionEnvironment env =
> > > StreamExecutionEnvironment.getExecutionEnvironment();
> > >         /**
> > >          * If enable checkpoint, blink planner will failed
> > >          */
> > >         env.enableCheckpointing(1000);
> > >
> > >         EnvironmentSettings envSettings =
> > EnvironmentSettings.newInstance()
> > > //                .useBlinkPlanner() // compile fail
> > >                 .useOldPlanner() // compile success
> > >                 .inStreamingMode()
> > >                 .build();
> > >         StreamTableEnvironment tEnv =
> > > StreamTableEnvironment.create(env, envSettings);
> > >         DataStream<Order> orderA = env.fromCollection(Arrays.asList(
> > >                 new Order(1L, "beer", 3),
> > >                 new Order(1L, "diaper", 4),
> > >                 new Order(3L, "beer", 2)
> > >         ));
> > >
> > > //        Table table = tEnv.fromDataStream(orderA);
> > >         tEnv.createTemporaryView("orderA", orderA);
> > >         Table res = tEnv.sqlQuery("SELECT * FROM orderA");
> > >         DataStream<Tuple2<Boolean, Row>> ds =
> > > tEnv.toRetractStream(res, Row.class);
> > >         ds.print();
> > >         env.execute();
> > >
> > >     }
> > >
> > >     public static class Order {
> > >         public long user;
> > >         public String product;
> > >         public int amount;
> > >
> > >         public Order(long user, String product, int amount) {
> > >             this.user = user;
> > >             this.product = product;
> > >             this.amount = amount;
> > >         }
> > >
> > >         public long getUser() {
> > >             return user;
> > >         }
> > >
> > >         public void setUser(long user) {
> > >             this.user = user;
> > >         }
> > >
> > >         public String getProduct() {
> > >             return product;
> > >         }
> > >
> > >         public void setProduct(String product) {
> > >             this.product = product;
> > >         }
> > >
> > >         public int getAmount() {
> > >             return amount;
> > >         }
> > >
> > >         public void setAmount(int amount) {
> > >             this.amount = amount;
> > >         }
> > >     }
> > > }
> > >
> > > 2. mvn clean package to a jar file
> > > 3. then we use the following code to produce a jobgraph:
> > >
> > > PackagedProgram program =
> > >         PackagedProgram.newBuilder()
> > >                 .setJarFile(userJarPath)
> > >                 .setUserClassPaths(classpaths)
> > >                 .setEntryPointClassName(userMainClass)
> > >                 .setConfiguration(configuration)
> > >
> > > .setSavepointRestoreSettings((descriptor.isRecoverFromSavepoint() &&
> > > descriptor.getSavepointPath() != null &&
> > > !descriptor.getSavepointPath().equals("")) ?
> > >
> > > SavepointRestoreSettings.forPath(descriptor.getSavepointPath(),
> > > descriptor.isAllowNonRestoredState()) :
> > > SavepointRestoreSettings.none())
> > >                 .setArguments(userArgs)
> > >                 .build();
> > >
> > >
> > > JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program,
> > > configuration, 4, true);
> > >
> > > 4. If we use blink planner & enable checkpoint, the compile will
> failed.
> > > For the others, the compile success.
> > >
> > > Thanks,
> > > Bin
> > >
> > > Jark Wu <[hidden email]> 于2020年6月17日周三 上午10:42写道:
> > >
> > > > Hi,
> > > >
> > > > Which Flink version are you using? Are you using SQL CLI? Could you
> > share
> > > > your table/sql program?
> > > > We did fix some classloading problems around SQL CLI, e.g.
> FLINK-18302
> > > >
> > > > Best,
> > > > Jark
> > > >
> > > > On Wed, 17 Jun 2020 at 10:31, 杜斌 <[hidden email]> wrote:
> > > >
> > > > > add the full stack trace here:
> > > > >
> > > > >
> > > > > Caused by:
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> > > > > org.apache.flink.api.common.InvalidProgramException: Table program
> > > cannot
> > > > > be compiled. This is a bug. Please file an issue.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> > > > > ... 14 more
> > > > > Caused by: org.apache.flink.api.common.InvalidProgramException:
> Table
> > > > > program cannot be compiled. This is a bug. Please file an issue.
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> > > > > ... 17 more
> > > > > Caused by: org.codehaus.commons.compiler.CompileException: Line 2,
> > > Column
> > > > > 46: Cannot determine simple type name "org"
> > > > > at
> > > org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > at
> > > > >
> > >
> org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > at
> org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> > > > > at
> > org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> > > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> > > > > at
> > > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> > > > > at
> > > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> > > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> > > > > at org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> > > > > at
> > org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> > > > > at
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886)
> > > > > at org.codehaus.janino.IClass.getSuperclass(IClass.java:455)
> > > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:260)
> > > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:237)
> > > > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492)
> > > > > at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> > > > > at
> org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> > > > > at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> > > > > at
> > org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> > > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> > > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> > > > > at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> > > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> > > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> > > > > at
> > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> > > > > ... 23 more
> > > > >
> > > > > 杜斌 <[hidden email]> 于2020年6月17日周三 上午10:29写道:
> > > > >
> > > > > > Hi,
> > > > > > Need help on this issue, here is what Flink reported when I
> enable
> > > the
> > > > > > checkpoint setting of the StreamExecutionEnvironment:
> > > > > >
> > > > > > /* 1 */
> > > > > > /* 2 */      public class SourceConversion$1 extends
> > > > > >
> > > org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> > > > > > /* 3 */          implements
> > > > > > org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> > > > > > /* 4 */
> > > > > > /* 5 */        private final Object[] references;
> > > > > > /* 6 */        private transient
> > > > > >
> org.apache.flink.table.dataformat.DataFormatConverters.RowConverter
> > > > > > converter$0;
> > > > > > /* 7 */        private final
> > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > > outElement =
> > > > > > new
> > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> > > > > > /* 8 */
> > > > > > /* 9 */        public SourceConversion$1(
> > > > > > /* 10 */            Object[] references,
> > > > > > /* 11 */
> > > org.apache.flink.streaming.runtime.tasks.StreamTask
> > > > > > task,
> > > > > > /* 12 */
> > org.apache.flink.streaming.api.graph.StreamConfig
> > > > > > config,
> > > > > > /* 13 */
> org.apache.flink.streaming.api.operators.Output
> > > > > > output) throws Exception {
> > > > > > /* 14 */          this.references = references;
> > > > > > /* 15 */          converter$0 =
> > > > > >
> > > (((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter)
> > > > > > references[0]));
> > > > > > /* 16 */          this.setup(task, config, output);
> > > > > > /* 17 */        }
> > > > > > /* 18 */
> > > > > > /* 19 */        @Override
> > > > > > /* 20 */        public void open() throws Exception {
> > > > > > /* 21 */          super.open();
> > > > > > /* 22 */
> > > > > > /* 23 */        }
> > > > > > /* 24 */
> > > > > > /* 25 */        @Override
> > > > > > /* 26 */        public void
> > > > > >
> > > > >
> > > >
> > >
> >
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > > > > element) throws Exception {
> > > > > > /* 27 */          org.apache.flink.table.dataformat.BaseRow in1 =
> > > > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > > > converter$0.toInternal((org.apache.flink.types.Row)
> > > > element.getValue());
> > > > > > /* 28 */
> > > > > > /* 29 */
> > > > > > /* 30 */
> > > > > > /* 31 */          output.collect(outElement.replace(in1));
> > > > > > /* 32 */        }
> > > > > > /* 33 */
> > > > > > /* 34 */
> > > > > > /* 35 */
> > > > > > /* 36 */        @Override
> > > > > > /* 37 */        public void close() throws Exception {
> > > > > > /* 38 */           super.close();
> > > > > > /* 39 */
> > > > > > /* 40 */        }
> > > > > > /* 41 */
> > > > > > /* 42 */
> > > > > > /* 43 */      }
> > > > > > /* 44 */
> > > > > >
> > > > > > Exception in thread "main"
> > > org.apache.flink.util.FlinkRuntimeException:
> > > > > > org.apache.flink.api.common.InvalidProgramException: Table
> program
> > > > cannot
> > > > > > be compiled. This is a bug. Please file an issue.
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85)
> > > > > >
> > > > > > The janino will compile successfully when I remove the checkpoint
> > > > setting
> > > > > > of the env.
> > > > > >
> > > > > > Can anyone help on this?
> > > > > >
> > > > > > Thanks,
> > > > > > Bin
> > > > > >
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Table program cannot be compiled when enable checkpoint of StreamExecutionEnvironment

Jark Wu-2
Glad to hear that!

On Thu, 18 Jun 2020 at 09:12, 杜斌 <[hidden email]> wrote:

> Sure, it helps a lot. After update to Flink 1.10.1, problem solved!
>
> Thanks,
> Bin
>
> Fabian Hueske <[hidden email]> 于2020年6月17日周三 下午4:32写道:
>
> > The exception is thrown when the StreamGraph is translated to a JobGraph.
> > The translation logic has a switch. If checkpointing is enabled, the Java
> > code generated by the optimizer is directly compiled in the client (in
> > contrast to compiling it on the TaskManagers when the operators are
> > started).
> > The compiler needs access to the UDF classes but the classloader that's
> > being used doesn't know about the UDF classes.
> >
> > The classloader that's used for the compilation is generated by
> > PackagedProgram.
> > You can configure the PackagedProgram with the right user code JAR URLs
> > which contain the UDF classes.
> > Alternatively, you can try to inject another classloader into
> > PackagedProgram using Reflection (but that's a rather hacky approach).
> >
> > Hope this helps.
> >
> > Cheers, Fabian
> >
> > Am Mi., 17. Juni 2020 um 06:48 Uhr schrieb Jark Wu <[hidden email]>:
> >
> > > Why compile JobGraph yourself? This is really an internal API and may
> > cause
> > > problems.
> > > Could you try to use `flink run` command [1] to submit your user jar
> > > instead?
> > >
> > > Btw, what's your Flink version? If you are using Flink 1.10.0, could
> you
> > > try to use 1.10.1?
> > >
> > > Best,
> > > Jark
> > >
> > > On Wed, 17 Jun 2020 at 12:41, 杜斌 <[hidden email]> wrote:
> > >
> > > > Thanks for the reply,
> > > > Here is the simple java program that re-produce the problem:
> > > > 1. code for the application:
> > > >
> > > > import org.apache.flink.api.java.tuple.Tuple2;
> > > > import org.apache.flink.streaming.api.datastream.DataStream;
> > > > import
> > > >
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> > > > import org.apache.flink.table.api.EnvironmentSettings;
> > > > import org.apache.flink.table.api.Table;
> > > > import org.apache.flink.table.api.java.StreamTableEnvironment;
> > > > import org.apache.flink.types.Row;
> > > >
> > > >
> > > > import java.util.Arrays;
> > > >
> > > > public class Test {
> > > >     public static void main(String[] args) throws Exception {
> > > >
> > > >         StreamExecutionEnvironment env =
> > > > StreamExecutionEnvironment.getExecutionEnvironment();
> > > >         /**
> > > >          * If enable checkpoint, blink planner will failed
> > > >          */
> > > >         env.enableCheckpointing(1000);
> > > >
> > > >         EnvironmentSettings envSettings =
> > > EnvironmentSettings.newInstance()
> > > > //                .useBlinkPlanner() // compile fail
> > > >                 .useOldPlanner() // compile success
> > > >                 .inStreamingMode()
> > > >                 .build();
> > > >         StreamTableEnvironment tEnv =
> > > > StreamTableEnvironment.create(env, envSettings);
> > > >         DataStream<Order> orderA = env.fromCollection(Arrays.asList(
> > > >                 new Order(1L, "beer", 3),
> > > >                 new Order(1L, "diaper", 4),
> > > >                 new Order(3L, "beer", 2)
> > > >         ));
> > > >
> > > > //        Table table = tEnv.fromDataStream(orderA);
> > > >         tEnv.createTemporaryView("orderA", orderA);
> > > >         Table res = tEnv.sqlQuery("SELECT * FROM orderA");
> > > >         DataStream<Tuple2<Boolean, Row>> ds =
> > > > tEnv.toRetractStream(res, Row.class);
> > > >         ds.print();
> > > >         env.execute();
> > > >
> > > >     }
> > > >
> > > >     public static class Order {
> > > >         public long user;
> > > >         public String product;
> > > >         public int amount;
> > > >
> > > >         public Order(long user, String product, int amount) {
> > > >             this.user = user;
> > > >             this.product = product;
> > > >             this.amount = amount;
> > > >         }
> > > >
> > > >         public long getUser() {
> > > >             return user;
> > > >         }
> > > >
> > > >         public void setUser(long user) {
> > > >             this.user = user;
> > > >         }
> > > >
> > > >         public String getProduct() {
> > > >             return product;
> > > >         }
> > > >
> > > >         public void setProduct(String product) {
> > > >             this.product = product;
> > > >         }
> > > >
> > > >         public int getAmount() {
> > > >             return amount;
> > > >         }
> > > >
> > > >         public void setAmount(int amount) {
> > > >             this.amount = amount;
> > > >         }
> > > >     }
> > > > }
> > > >
> > > > 2. mvn clean package to a jar file
> > > > 3. then we use the following code to produce a jobgraph:
> > > >
> > > > PackagedProgram program =
> > > >         PackagedProgram.newBuilder()
> > > >                 .setJarFile(userJarPath)
> > > >                 .setUserClassPaths(classpaths)
> > > >                 .setEntryPointClassName(userMainClass)
> > > >                 .setConfiguration(configuration)
> > > >
> > > > .setSavepointRestoreSettings((descriptor.isRecoverFromSavepoint() &&
> > > > descriptor.getSavepointPath() != null &&
> > > > !descriptor.getSavepointPath().equals("")) ?
> > > >
> > > > SavepointRestoreSettings.forPath(descriptor.getSavepointPath(),
> > > > descriptor.isAllowNonRestoredState()) :
> > > > SavepointRestoreSettings.none())
> > > >                 .setArguments(userArgs)
> > > >                 .build();
> > > >
> > > >
> > > > JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program,
> > > > configuration, 4, true);
> > > >
> > > > 4. If we use blink planner & enable checkpoint, the compile will
> > failed.
> > > > For the others, the compile success.
> > > >
> > > > Thanks,
> > > > Bin
> > > >
> > > > Jark Wu <[hidden email]> 于2020年6月17日周三 上午10:42写道:
> > > >
> > > > > Hi,
> > > > >
> > > > > Which Flink version are you using? Are you using SQL CLI? Could you
> > > share
> > > > > your table/sql program?
> > > > > We did fix some classloading problems around SQL CLI, e.g.
> > FLINK-18302
> > > > >
> > > > > Best,
> > > > > Jark
> > > > >
> > > > > On Wed, 17 Jun 2020 at 10:31, 杜斌 <[hidden email]> wrote:
> > > > >
> > > > > > add the full stack trace here:
> > > > > >
> > > > > >
> > > > > > Caused by:
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException:
> > > > > > org.apache.flink.api.common.InvalidProgramException: Table
> program
> > > > cannot
> > > > > > be compiled. This is a bug. Please file an issue.
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66)
> > > > > > ... 14 more
> > > > > > Caused by: org.apache.flink.api.common.InvalidProgramException:
> > Table
> > > > > > program cannot be compiled. This is a bug. Please file an issue.
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197)
> > > > > > ... 17 more
> > > > > > Caused by: org.codehaus.commons.compiler.CompileException: Line
> 2,
> > > > Column
> > > > > > 46: Cannot determine simple type name "org"
> > > > > > at
> > > >
> org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124)
> > > > > > at
> > > > > >
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6746)
> > > > > > at
> > > > > >
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6507)
> > > > > > at
> > > > > >
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > > at
> > > > > >
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > > at
> > > > > >
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > > at
> > > > > >
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > > at
> > > > > >
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > > at
> > > > > >
> > > >
> > org.codehaus.janino.UnitCompiler.getReferenceType(UnitCompiler.java:6520)
> > > > > > at
> > org.codehaus.janino.UnitCompiler.getType2(UnitCompiler.java:6486)
> > > > > > at
> > > org.codehaus.janino.UnitCompiler.access$13800(UnitCompiler.java:215)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6394)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$21$1.visitReferenceType(UnitCompiler.java:6389)
> > > > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3917)
> > > > > > at
> > > > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6389)
> > > > > > at
> > > > org.codehaus.janino.UnitCompiler$21.visitType(UnitCompiler.java:6382)
> > > > > > at org.codehaus.janino.Java$ReferenceType.accept(Java.java:3916)
> > > > > > at
> org.codehaus.janino.UnitCompiler.getType(UnitCompiler.java:6382)
> > > > > > at
> > > org.codehaus.janino.UnitCompiler.access$1300(UnitCompiler.java:215)
> > > > > > at
> > > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$33.getSuperclass2(UnitCompiler.java:9886)
> > > > > > at org.codehaus.janino.IClass.getSuperclass(IClass.java:455)
> > > > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:260)
> > > > > > at org.codehaus.janino.IClass.getIMethods(IClass.java:237)
> > > > > > at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:492)
> > > > > > at
> org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432)
> > > > > > at
> > org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414)
> > > > > > at
> org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406)
> > > > > > at
> > > org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378)
> > > > > > at
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465)
> > > > > > at
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216)
> > > > > > at
> org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207)
> > > > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
> > > > > > at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
> > > > > > at
> > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78)
> > > > > > ... 23 more
> > > > > >
> > > > > > 杜斌 <[hidden email]> 于2020年6月17日周三 上午10:29写道:
> > > > > >
> > > > > > > Hi,
> > > > > > > Need help on this issue, here is what Flink reported when I
> > enable
> > > > the
> > > > > > > checkpoint setting of the StreamExecutionEnvironment:
> > > > > > >
> > > > > > > /* 1 */
> > > > > > > /* 2 */      public class SourceConversion$1 extends
> > > > > > >
> > > >
> org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator
> > > > > > > /* 3 */          implements
> > > > > > >
> org.apache.flink.streaming.api.operators.OneInputStreamOperator {
> > > > > > > /* 4 */
> > > > > > > /* 5 */        private final Object[] references;
> > > > > > > /* 6 */        private transient
> > > > > > >
> > org.apache.flink.table.dataformat.DataFormatConverters.RowConverter
> > > > > > > converter$0;
> > > > > > > /* 7 */        private final
> > > > > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > > > outElement =
> > > > > > > new
> > > > org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null);
> > > > > > > /* 8 */
> > > > > > > /* 9 */        public SourceConversion$1(
> > > > > > > /* 10 */            Object[] references,
> > > > > > > /* 11 */
> > > > org.apache.flink.streaming.runtime.tasks.StreamTask
> > > > > > > task,
> > > > > > > /* 12 */
> > > org.apache.flink.streaming.api.graph.StreamConfig
> > > > > > > config,
> > > > > > > /* 13 */
> > org.apache.flink.streaming.api.operators.Output
> > > > > > > output) throws Exception {
> > > > > > > /* 14 */          this.references = references;
> > > > > > > /* 15 */          converter$0 =
> > > > > > >
> > > >
> (((org.apache.flink.table.dataformat.DataFormatConverters.RowConverter)
> > > > > > > references[0]));
> > > > > > > /* 16 */          this.setup(task, config, output);
> > > > > > > /* 17 */        }
> > > > > > > /* 18 */
> > > > > > > /* 19 */        @Override
> > > > > > > /* 20 */        public void open() throws Exception {
> > > > > > > /* 21 */          super.open();
> > > > > > > /* 22 */
> > > > > > > /* 23 */        }
> > > > > > > /* 24 */
> > > > > > > /* 25 */        @Override
> > > > > > > /* 26 */        public void
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord
> > > > > > > element) throws Exception {
> > > > > > > /* 27 */          org.apache.flink.table.dataformat.BaseRow
> in1 =
> > > > > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > > > > (org.apache.flink.table.dataformat.BaseRow)
> > > > > > > converter$0.toInternal((org.apache.flink.types.Row)
> > > > > element.getValue());
> > > > > > > /* 28 */
> > > > > > > /* 29 */
> > > > > > > /* 30 */
> > > > > > > /* 31 */          output.collect(outElement.replace(in1));
> > > > > > > /* 32 */        }
> > > > > > > /* 33 */
> > > > > > > /* 34 */
> > > > > > > /* 35 */
> > > > > > > /* 36 */        @Override
> > > > > > > /* 37 */        public void close() throws Exception {
> > > > > > > /* 38 */           super.close();
> > > > > > > /* 39 */
> > > > > > > /* 40 */        }
> > > > > > > /* 41 */
> > > > > > > /* 42 */
> > > > > > > /* 43 */      }
> > > > > > > /* 44 */
> > > > > > >
> > > > > > > Exception in thread "main"
> > > > org.apache.flink.util.FlinkRuntimeException:
> > > > > > > org.apache.flink.api.common.InvalidProgramException: Table
> > program
> > > > > cannot
> > > > > > > be compiled. This is a bug. Please file an issue.
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.generated.GeneratedClass.getClass(GeneratedClass.java:96)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.getStreamOperatorClass(CodeGenOperatorFactory.java:62)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate(StreamingJobGraphGenerator.java:214)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:149)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:104)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:777)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.streaming.api.graph.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:52)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:43)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:57)
> > > > > > > at
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:85)
> > > > > > >
> > > > > > > The janino will compile successfully when I remove the
> checkpoint
> > > > > setting
> > > > > > > of the env.
> > > > > > >
> > > > > > > Can anyone help on this?
> > > > > > >
> > > > > > > Thanks,
> > > > > > > Bin
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>