[jira] [Created] (FLINK-18530) ParquetAvroWriters can not write data to hdfs

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-18530) ParquetAvroWriters can not write data to hdfs

Shang Yuanchun (Jira)
humengyu created FLINK-18530:
--------------------------------

             Summary: ParquetAvroWriters can not write data to hdfs
                 Key: FLINK-18530
                 URL: https://issues.apache.org/jira/browse/FLINK-18530
             Project: Flink
          Issue Type: Bug
          Components: Connectors / FileSystem
    Affects Versions: 1.11.0
            Reporter: humengyu


I read data from kafka and write to hdfs by StreamingFileSink:
 # in version 1.11.0, ParquetAvroWriters does not work, but it works well in version 1.10.1;
 #  AvroWriters works well in 1.11.0.

{code:java}
public class TestParquetAvroSink {  @Test
  public void testParquet() throws Exception {
    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()
        .inStreamingMode().build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
    env.enableCheckpointing(20000L);    TableSchema tableSchema = TableSchema.builder().fields(
        new String[]{"id", "name", "sex"},
        new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()})
        .build();    // build a kafka source
    DataStream<Row> rowDataStream = xxxx;    Schema schema = SchemaBuilder
        .record("xxx")
        .namespace("xxxx")
        .fields()
        .optionalString("id")
        .optionalString("name")
        .optionalString("sex")
        .endRecord();    OutputFileConfig config = OutputFileConfig
        .builder()
        .withPartPrefix("prefix")
        .withPartSuffix(".ext")
        .build();    StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(
            new Path("hdfs://host:port/xxx/xxx/xxx"),
            ParquetAvroWriters.forGenericRecord(schema))
        .withOutputFileConfig(config)
        .withBucketAssigner(new DateTimeBucketAssigner<>("'pdate='yyyy-MM-dd"))
        .build();    SingleOutputStreamOperator<GenericRecord> recordDateStream = rowDataStream
        .map(new RecordMapFunction());    recordDateStream.print();
    recordDateStream.addSink(sink);    env.execute("test");  }
  @Test
  public void testAvro() throws Exception {
    EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner()
        .inStreamingMode().build();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
    env.enableCheckpointing(20000L);    TableSchema tableSchema = TableSchema.builder().fields(
        new String[]{"id", "name", "sex"},
        new DataType[]{DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()})
        .build();    // build a kafka source
    DataStream<Row> rowDataStream = xxxx;    Schema schema = SchemaBuilder
        .record("xxx")
        .namespace("xxxx")
        .fields()
        .optionalString("id")
        .optionalString("name")
        .optionalString("sex")
        .endRecord();    OutputFileConfig config = OutputFileConfig
        .builder()
        .withPartPrefix("prefix")
        .withPartSuffix(".ext")
        .build();    StreamingFileSink<GenericRecord> sink = StreamingFileSink
        .forBulkFormat(
            new Path("hdfs://host:port/xxx/xxx/xxx"),
            AvroWriters.forGenericRecord(schema))
        .withOutputFileConfig(config)
        .withBucketAssigner(new DateTimeBucketAssigner<>("'pdate='yyyy-MM-dd"))
        .build();    SingleOutputStreamOperator<GenericRecord> recordDateStream = rowDataStream
        .map(new RecordMapFunction());    recordDateStream.print();
    recordDateStream.addSink(sink);    env.execute("test");  }  public static class RecordMapFunction implements MapFunction<Row, GenericRecord> {    private transient Schema schema;    @Override
    public GenericRecord map(Row row) throws Exception {
      if (schema == null) {
        schema = SchemaBuilder
            .record("xxx")
            .namespace("xxx")
            .fields()
            .optionalString("id")
            .optionalString("name")
            .optionalString("sex")
            .endRecord();
      }
      Record record = new Record(schema);
      record.put("id", row.getField(0));
      record.put("name", row.getField(1));
      record.put("sex", row.getField(2));
      return record;
    }
  }
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)