Xianxun Ye created FLINK-20181:
---------------------------------- Summary: RowData cannot cast to Tuple2 Key: FLINK-20181 URL: https://issues.apache.org/jira/browse/FLINK-20181 Project: Flink Issue Type: Bug Reporter: Xianxun Ye I want to emit CDC data by my own StreamOperator. flink version :1.11.2, blink planner. {code:java} //代码占位符 getTableEnv().registerTableSource( "source", new StreamTableSource<RowData>() { TableSchema tableSchema = TableSchema.builder() .field("id", new AtomicDataType(new IntType(false))) .field("name", DataTypes.STRING()) .field("type", DataTypes.STRING()) .primaryKey("id") .build(); @Override public DataStream<RowData> getDataStream(StreamExecutionEnvironment execEnv) { return execEnv.addSource(new DebugSourceFunction(tableSchema.toRowDataType())); } @Override public TableSchema getTableSchema() { return tableSchema; } @Override public DataType getProducedDataType() { return getTableSchema().toRowDataType().bridgedTo(RowData.class); } } ); sql("insert into Test.testdb.animal " + " SELECT id, name, type, '2020' as da, '11' as hr" + " from source" ); class DebugSourceFunction extends RichParallelSourceFunction<RowData> implements ResultTypeQueryable<RowData> { DataType dataType; public DebugSourceFunction(DataType dataType) { this.dataType = dataType; } @Override public TypeInformation<RowData> getProducedType() { return (TypeInformation<RowData>) createTypeInformation(dataType); } @Override public void run(SourceContext<RowData> ctx) throws Exception { ctx.collect(GenericRowData.ofKind(RowKind.INSERT, 1, StringData.fromString("monkey"), StringData.fromString("small"))); } @Override public void cancel() { } public TypeInformation<?> createTypeInformation(DataType producedDataType) { final DataType internalDataType = DataTypeUtils.transform( producedDataType, TypeTransformations.TO_INTERNAL_CLASS); return fromDataTypeToTypeInfo(internalDataType); } } public class TestUpsertTableSink implements UpsertStreamTableSink<RowData>, OverwritableTableSink, PartitionableTableSink { @Override public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, RowData>> dataStream) { DataStream<Void> returnStream = dataStream .map( (MapFunction<Tuple2<Boolean, RowData>, RowData>) value -> value.f1 ) ...... return returnStream .addSink(new DiscardingSink<>()) .setParallelism(1); } } {code} when I execute sql with `insert into ...`, occurs class cast fail exception: {code:java} //代码占位符 Caused by: java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData cannot be cast to org.apache.flink.api.java.tuple.Tuple2Caused by: java.lang.ClassCastException: org.apache.flink.table.data.GenericRowData cannot be cast to org.apache.flink.api.java.tuple.Tuple2 at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at StreamExecCalc$8.processElement(Unknown Source) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:717) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:692) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:672) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:52) at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:30) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |