seunjjs created FLINK-19655:
------------------------------- Summary: NPE when using blink planner and TemporalTableFunction after setting IdleStateRetentionTime Key: FLINK-19655 URL: https://issues.apache.org/jira/browse/FLINK-19655 Project: Flink Issue Type: Bug Components: Table SQL / Planner Affects Versions: 1.10.0 Reporter: seunjjs My Code here: EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, bsSettings); tableEnv.getConfig().setIdleStateRetentionTime(Time.seconds(60), Time.seconds(600)); final Table table = tableEnv.from("tableName"); final TableFunction<?> function = table.createTemporalTableFunction( temporalTableEntry.getTimeAttribute(), String.join(",", temporalTableEntry.getPrimaryKeyFields())); tableEnv.registerFunction(temporalTableEntry.getName(), function); And NPE throwed when I executed my program. java.lang.NullPointerException at org.apache.flink.table.runtime.operators.join.temporal.BaseTwoInputStreamOperatorWithStateRetention.registerProcessingCleanupTimer(BaseTwoInputStreamOperatorWithStateRetention.java:109) at org.apache.flink.table.runtime.operators.join.temporal.TemporalProcessTimeJoinOperator.processElement2(TemporalProcessTimeJoinOperator.java:98) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processRecord2(StreamTwoInputProcessor.java:145) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.lambda$new$1(StreamTwoInputProcessor.java:107) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessor.java:362) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor.processInput(StreamTwoInputProcessor.java:185) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) And When I changed to useOldPlanner, it worked fine.And when I debuged the code ,I found BaseTwoInputStreamOperatorWithStateRetention#open did not be executed. Here is BaseTwoInputStreamOperatorWithStateRetention#open code. public void open() throws Exception { initializeTimerService(); if (stateCleaningEnabled) { ValueStateDescriptor<Long> cleanupStateDescriptor = new ValueStateDescriptor<>(CLEANUP_TIMESTAMP, Types.LONG); latestRegisteredCleanupTimer = getRuntimeContext().getState(cleanupStateDescriptor); } } Here is TemporalProcessTimeJoinOperator#open code. public void open() throws Exception { this.joinCondition = generatedJoinCondition.newInstance(getRuntimeContext().getUserCodeClassLoader()); FunctionUtils.setFunctionRuntimeContext(joinCondition, getRuntimeContext()); FunctionUtils.openFunction(joinCondition, new Configuration()); ValueStateDescriptor<BaseRow> rightStateDesc = new ValueStateDescriptor<>("right", rightType); this.rightState = getRuntimeContext().getState(rightStateDesc); this.collector = new TimestampedCollector<>(output); this.outRow = new JoinedRow(); // consider watermark from left stream only. super.processWatermark2(Watermark.MAX_WATERMARK); } I compared the code with oldplaner(TemporalProcessTimeJoin#open).May be TemporalProcessTimeJoinOperator#open should add super.open()? Here is TemporalProcessTimeJoin#open code. override def open(): Unit = { LOG.debug(s"Compiling FlatJoinFunction: $genJoinFuncName \n\n Code:\n$genJoinFuncCode") val clazz = compile( getRuntimeContext.getUserCodeClassLoader, genJoinFuncName, genJoinFuncCode) LOG.debug("Instantiating FlatJoinFunction.") joinFunction = clazz.newInstance() FunctionUtils.setFunctionRuntimeContext(joinFunction, getRuntimeContext) FunctionUtils.openFunction(joinFunction, new Configuration()) val rightStateDescriptor = new ValueStateDescriptor[Row]("right", rightType) rightState = getRuntimeContext.getState(rightStateDescriptor) collector = new TimestampedCollector[CRow](output) cRowWrapper = new CRowWrappingCollector() cRowWrapper.out = collector super.open() } -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |