humengyu created FLINK-18530:
-------------------------------- Summary: ParquetAvroWriters can not write data to hdfs Key: FLINK-18530 URL: https://issues.apache.org/jira/browse/FLINK-18530 Project: Flink Issue Type: Bug Components: Connectors / FileSystem Affects Versions: 1.11.0 Reporter: humengyu I read data from kafka and write to hdfs by StreamingFileSink: # in version 1.11.0, ParquetAvroWriters does not work, but it works well in version 1.10.1; # Â AvroWriters works well in 1.11.0. {code:java} public class TestParquetAvroSink { @Test public void testParquet() throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner() .inStreamingMode().build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.enableCheckpointing(20000L); TableSchema tableSchema = TableSchema.builder().fields( new String[]{"id", "name", "sex"}, new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}) .build(); // build a kafka source DataStream<Row> rowDataStream = xxxx; Schema schema = SchemaBuilder .record("xxx") .namespace("xxxx") .fields() .optionalString("id") .optionalString("name") .optionalString("sex") .endRecord(); OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("prefix") .withPartSuffix(".ext") .build(); StreamingFileSink<GenericRecord> sink = StreamingFileSink .forBulkFormat( new Path("hdfs://host:port/xxx/xxx/xxx"), ParquetAvroWriters.forGenericRecord(schema)) .withOutputFileConfig(config) .withBucketAssigner(new DateTimeBucketAssigner<>("'pdate='yyyy-MM-dd")) .build(); SingleOutputStreamOperator<GenericRecord> recordDateStream = rowDataStream .map(new RecordMapFunction()); recordDateStream.print(); recordDateStream.addSink(sink); env.execute("test"); } @Test public void testAvro() throws Exception { EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner() .inStreamingMode().build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings); env.enableCheckpointing(20000L); TableSchema tableSchema = TableSchema.builder().fields( new String[]{"id", "name", "sex"}, new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()}) .build(); // build a kafka source DataStream<Row> rowDataStream = xxxx; Schema schema = SchemaBuilder .record("xxx") .namespace("xxxx") .fields() .optionalString("id") .optionalString("name") .optionalString("sex") .endRecord(); OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("prefix") .withPartSuffix(".ext") .build(); StreamingFileSink<GenericRecord> sink = StreamingFileSink .forBulkFormat( new Path("hdfs://host:port/xxx/xxx/xxx"), AvroWriters.forGenericRecord(schema)) .withOutputFileConfig(config) .withBucketAssigner(new DateTimeBucketAssigner<>("'pdate='yyyy-MM-dd")) .build(); SingleOutputStreamOperator<GenericRecord> recordDateStream = rowDataStream .map(new RecordMapFunction()); recordDateStream.print(); recordDateStream.addSink(sink); env.execute("test"); } public static class RecordMapFunction implements MapFunction<Row, GenericRecord> { private transient Schema schema; @Override public GenericRecord map(Row row) throws Exception { if (schema == null) { schema = SchemaBuilder .record("xxx") .namespace("xxx") .fields() .optionalString("id") .optionalString("name") .optionalString("sex") .endRecord(); } Record record = new Record(schema); record.put("id", row.getField(0)); record.put("name", row.getField(1)); record.put("sex", row.getField(2)); return record; } } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |