Fanbin Bu created FLINK-15928:
--------------------------------- Summary: Batch mode in blink planner caused IndexOutOfBoundsException error Key: FLINK-15928 URL: https://issues.apache.org/jira/browse/FLINK-15928 Project: Flink Issue Type: Bug Affects Versions: 1.9.2 Reporter: Fanbin Bu Flink version: 1.9.2 mode: Batch mode, running on EMR with YARN The following is the details: table source sample: class SnowflakeTableSource(val schema: TableSchema, val parallelism: Int, val fetchSize: Int, val query: String, val options: SnowflakeOptions ) extends StreamTableSource[Row] { override def getDataStream(execEnv: StreamExecutionEnvironment): SingleOutputStreamOperator[Row] = { execEnv.createInput(getInputFormat, getReturnType).name("app_event_stream") } override def getReturnType: TypeInformation[Row] = schema.toRowType override def getTableSchema: TableSchema = schema override def isBounded: Boolean = true private def getInputFormat: JDBCInputFormat = { JDBCInputFormat.buildJDBCInputFormat .setDrivername(options.driverName) .setDBUrl(options.dbUrl) .setUsername(options.username) .setPassword(options.password) .setQuery(query) .setRowTypeInfo(getInputRowTypeInfo) .setFetchSize(fetchSize) .setParametersProvider(new GenericParameterValuesProvider(buildQueryParams(parallelism))) .finish } } Here is the sample setup code: val settings = EnvironmentSettings.newInstance() .useBlinkPlanner() .inBatchMode() .build() val tableEnv = TableEnvironment.create(settings) val configurations = tableEnv.getConfig.getConfiguration configurations.setString( TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY.key, s"${Globals.TABLE_EXEC_RESOURCE_HASH_AGG_MEMORY} mb") tableEnv.registerTableSource(tableName, tableSource) queryResult = tableEnv.sqlQuery(sql) tableEnv.execute() Here is the sample SQL: select ip_address , hop_end(created_at, interval '30' second, interval '1' minute) as bucket_ts , sum(case when name = 'signin' then 1 else 0 end) as signin_count_1m , sum(case when name = 'signin_failure' then 1 else 0 end) as signin_failure_count_1m ... from events group by ip_address , hop(created_at, interval '30' second, interval '1' minute) Here is the stacktrace: java.lang.IndexOutOfBoundsException at org.apache.flink.core.memory.MemorySegment.getInt(MemorySegment.java:701) at org.apache.flink.table.dataformat.BinaryRow.getInt(BinaryRow.java:264) at HashWinAggWithKeys$538.endInput(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain.endInput(OperatorChain.java:276) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.checkFinished(StreamOneInputProcessor.java:151) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:138) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:279) at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530) at java.lang.Thread.run(Thread.java:748) The fact that this same code works well with other sql and the stacktrace message suggests that this might be related to memory issue. And this only happens for blink planner in batch mode. I tried to use BatchTableEnvironment in old planner and it works. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |