Rockey Cui created FLINK-15212:
---------------------------------- Summary: PROCTIME attribute causes problems with timestamp times before 1900 ? Key: FLINK-15212 URL: https://issues.apache.org/jira/browse/FLINK-15212 Project: Flink Issue Type: Bug Components: Table SQL / API Affects Versions: 1.9.1 Environment: flink 1.9.1 jdk1.8.0_211 idea2019.3 Reporter: Rockey Cui A simple DataStreamSource with timestamp registered as a table. {code:java} StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); env.setParallelism(1); DataStreamSource<String> stringDataStreamSource = env.fromElements( "1001,1002,adc0,1900-01-01 00:00:00.0", "1002,1003,adc1,1910-01-01 00:00:00.0", "1003,1004,adc2,1920-01-01 00:00:00.0", "1004,1005,adc3,1930-01-01 00:00:00.0", "1005,1006,adc4,1970-01-01 00:00:00.0", "8888,6666,adc5,1971-01-01 00:00:00.0" ); TypeInformation<?>[] fieldTypes = new TypeInformation[]{Types.LONG, Types.LONG, Types.STRING, Types.SQL_TIM String[] fieldNames = new String[]{"id", "cityId", "url", "clickTime"}; RowTypeInfo rowTypeInfo = new RowTypeInfo(fieldTypes, fieldNames); DataStream<Row> stream = stringDataStreamSource.map((MapFunction<String, Row>) s -> { String[] split = s.split(","); Row row = new Row(split.length); for (int i = 0; i < split.length; i++) { Object value = null; if (fieldTypes[i].equals(Types.STRING)) { value = split[i]; } if (fieldTypes[i].equals(Types.LONG)) { value = Long.valueOf(split[i]); } if (fieldTypes[i].equals(Types.INT)) { value = Integer.valueOf(split[i]); } if (fieldTypes[i].equals(Types.DOUBLE)) { value = Double.valueOf(split[i]); } if (fieldTypes[i].equals(Types.SQL_TIMESTAMP)) { value = Timestamp.valueOf(split[i]); } row.setField(i, value); } //System.out.println(row.toString()); return row; }).returns(rowTypeInfo); tableEnv.registerDataStream("user_click_info", stream, String.join(",", fieldNames) + ",www.proctime"); String sql = "select * from user_click_info"; Table table = tableEnv.sqlQuery(sql); DataStream<Row> result = tableEnv.toAppendStream(table, Row.class); result.print(); table.printSchema(); tableEnv.execute("Test"); {code} result ==> root |-- id: BIGINT |-- cityId: BIGINT |-- url: STRING |-- clickTime: TIMESTAMP(3) |-- www: TIMESTAMP(3) *PROCTIME* 1001,1002,adc0,{color:#FF0000}1899-12-31 23:54:17.0{color},2019-12-12 03:37:18.036 1002,1003,adc1,1910-01-01 00:00:00.0,2019-12-12 03:37:18.196 1003,1004,adc2,1920-01-01 00:00:00.0,2019-12-12 03:37:18.196 1004,1005,adc3,1930-01-01 00:00:00.0,2019-12-12 03:37:18.196 1005,1006,adc4,1970-01-01 00:00:00.0,2019-12-12 03:37:18.196 8888,6666,adc5,1971-01-01 00:00:00.0,2019-12-12 03:37:18.196 ---- without PROCTIME attribute is OK ==> root |-- id: BIGINT |-- cityId: BIGINT |-- url: STRING |-- clickTime: TIMESTAMP(3) 1001,1002,adc0,1900-01-01 00:00:00.0 1002,1003,adc1,1910-01-01 00:00:00.0 1003,1004,adc2,1920-01-01 00:00:00.0 1004,1005,adc3,1930-01-01 00:00:00.0 1005,1006,adc4,1970-01-01 00:00:00.0 8888,6666,adc5,1971-01-01 00:00:00.0 -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |