Leonard Xu created FLINK-17847:
---------------------------------- Summary: ArrayIndexOutOfBoundsException happened in when codegen StreamExec operator Key: FLINK-17847 URL: https://issues.apache.org/jira/browse/FLINK-17847 Project: Flink Issue Type: Bug Components: Table SQL / Runtime Affects Versions: 1.10.0, 1.11.0 Reporter: Leonard Xu Fix For: 1.11.0, 1.10.0 user's query: {code:java} //source table create table json_table( w_es BIGINT, w_type STRING, w_isDdl BOOLEAN, w_data ARRAY<ROW<pay_info STRING, online_fee DOUBLE, sign STRING, account_pay_fee DOUBLE>>, w_ts TIMESTAMP(3), w_table STRING) WITH ( 'connector.type' = 'kafka', 'connector.version' = '0.10', 'connector.topic' = 'json-test2', 'connector.properties.zookeeper.connect' = 'localhost:2181', 'connector.properties.bootstrap.servers' = 'localhost:9092', 'connector.properties.group.id' = 'test-jdbc', 'connector.startup-mode' = 'earliest-offset', 'format.type' = 'json', 'format.derive-schema' = 'true' ) // real data: {"w_es":1589870637000,"w_type":"INSERT","w_isDdl":false,"w_data":[{"pay_info":"channelId=82&onlineFee=89.0&outTradeNo=0&payId=0&payType=02&rechargeId=4&totalFee=89.0&tradeStatus=success&userId=32590183789575&sign=00","online_fee":"89.0","sign":"00","account_pay_fee":"0.0"}],"w_ts":"2020-05-20T13:58:37.131Z","w_table":"cccc111"} //query select w_ts, 'test' as city1_id, w_data[0].pay_info AS cate3_id, w_data as pay_order_id from json_table {code} ~exception:~ {code:java} // Caused by: java.lang.ArrayIndexOutOfBoundsException: 1427848Caused by: java.lang.ArrayIndexOutOfBoundsException: 1427848 at org.apache.flink.table.runtime.util.SegmentsUtil.getByteMultiSegments(SegmentsUtil.java:598) at org.apache.flink.table.runtime.util.SegmentsUtil.getByte(SegmentsUtil.java:590) at org.apache.flink.table.runtime.util.SegmentsUtil.bitGet(SegmentsUtil.java:534) at org.apache.flink.table.dataformat.BinaryArray.isNullAt(BinaryArray.java:117) at StreamExecCalc$10.processElement(Unknown Source) {code} looks like in the codegen StreamExecCalc$10 operator some operation visit a '-1' index which should be wrong, this bug exits both in 1.10 and 1.11 {code:java} public class StreamExecCalc$10 extends org.apache.flink.table.runtime.operators.AbstractProcessStreamOperator implements org.apache.flink.streaming.api.operators.OneInputStreamOperator { private final Object[] references; private final org.apache.flink.table.dataformat.BinaryString str$3 = org.apache.flink.table.dataformat.BinaryString.fromString("test"); private transient org.apache.flink.table.runtime.typeutils.BaseArraySerializer typeSerializer$5; final org.apache.flink.table.dataformat.BoxedWrapperRow out = new org.apache.flink.table.dataformat.BoxedWrapperRow(4); private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); public StreamExecCalc$10( Object[] references, org.apache.flink.streaming.runtime.tasks.StreamTask task, org.apache.flink.streaming.api.graph.StreamConfig config, org.apache.flink.streaming.api.operators.Output output) throws Exception { this.references = references; typeSerializer$5 = (((org.apache.flink.table.runtime.typeutils.BaseArraySerializer) references[0])); this.setup(task, config, output); } @Override public void open() throws Exception { super.open(); } @Override public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { org.apache.flink.table.dataformat.BaseRow in1 = (org.apache.flink.table.dataformat.BaseRow) element.getValue(); org.apache.flink.table.dataformat.SqlTimestamp field$2; boolean isNull$2; org.apache.flink.table.dataformat.BaseArray field$4; boolean isNull$4; org.apache.flink.table.dataformat.BaseArray field$6; org.apache.flink.table.dataformat.BinaryString field$8; boolean isNull$8; org.apache.flink.table.dataformat.BinaryString result$9; boolean isNull$9; isNull$2 = in1.isNullAt(4); field$2 = null; if (!isNull$2) { field$2 = in1.getTimestamp(4, 3); } isNull$4 = in1.isNullAt(3); field$4 = null; if (!isNull$4) { field$4 = in1.getArray(3); } field$6 = field$4; if (!isNull$4) { field$6 = (org.apache.flink.table.dataformat.BaseArray) (typeSerializer$5.copy(field$6)); } out.setHeader(in1.getHeader()); if (isNull$2) { out.setNullAt(0); } else { out.setNonPrimitiveValue(0, field$2); } if (false) { out.setNullAt(1); } else { out.setNonPrimitiveValue(1, ((org.apache.flink.table.dataformat.BinaryString) str$3)); } boolean isNull$7 = isNull$4 || false || field$6.isNullAt(((int) 0) - 1); org.apache.flink.table.dataformat.BaseRow result$7 = isNull$7 ? null : field$6.getRow(((int) 0) - 1, 4); if (isNull$7) { result$9 = org.apache.flink.table.dataformat.BinaryString.EMPTY_UTF8; isNull$9 = true; } else { isNull$8 = result$7.isNullAt(0); field$8 = org.apache.flink.table.dataformat.BinaryString.EMPTY_UTF8; if (!isNull$8) { field$8 = result$7.getString(0); } result$9 = field$8; isNull$9 = isNull$8; } if (isNull$9) { out.setNullAt(2); } else { out.setNonPrimitiveValue(2, result$9); } if (isNull$4) { out.setNullAt(3); } else { out.setNonPrimitiveValue(3, field$6); } output.collect(outElement.replace(out)); } @Override public void close() throws Exception { super.close(); } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |