Flink POJO type in forAvroRecordClass function of Kafka010AvroTableSource

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Flink POJO type in forAvroRecordClass function of Kafka010AvroTableSource

Ansari, Kafeel (Nokia - IN/Bangalore)
Hi Team ,
I am using Flink 1.5.0  and  tried to use below code for Kafka consumer in Flink. I need some information for Flink POJO type .

       KafkaTableSource kf_source = Kafka010AvroTableSource.builder()
                           .forTopic("test-topic")
                           .withKafkaProperties(props)
                           .withSchema(tb_schema)
                           .forAvroRecordClass(StoreSales.class)  // ?? How can we create our own custom class to use in this function ?
                           .build();



Scenario :
 I need to collect avro records from Kafka topic using Kafka010AvroTableSource with below schema.

               long datetime,long ss_sold_date_sk ,long ss_sold_time_sk,long ss_item_sk,long ss_customer_sk ,long ss_cdemo_sk,long ss_hdemo_sk ,long ss_addr_sk,long ss_store_sk ,long ss_promo_sk,long ss_ticket_number,int ss_quantity ,double ss_wholesale_cost,double ss_list_price,double ss_sales_price,double ss_ext_discount_amt,double ss_ext_sales_price,double ss_ext_wholesale_cost,double ss_ext_list_price,double ss_ext_tax,double ss_coupon_amt,double ss_net_paid,double ss_net_paid_inc_tax,double ss_net_profit


I have passed the schema in "forAvroRecordClass" function  in above code snippet . Below is the code for tb_schema.


             TableSchema tb_schema = TableSchema.builder()
                           .field("datetime",Types.LONG())
                           .field("ss_sold_date_sk",Types.LONG())
                           .field("ss_sold_time_sk",Types.LONG())
                           .field("ss_item_sk",Types.LONG())
                           .field("ss_customer_sk",Types.LONG())
                           .field("ss_cdemo_sk",Types.LONG())
                           .field("ss_hdemo_sk",Types.LONG())
                           .field("ss_addr_sk",Types.LONG())
                           .field("ss_store_sk",Types.LONG())
                           .field("ss_promo_sk",Types.LONG())
                           .field("ss_ticket_number",Types.LONG())
                           .field("ss_quantity",Types.INT())
                           .field("ss_wholesale_cost",Types.DOUBLE())
                           .field("ss_list_price",Types.DOUBLE())
                           .field("ss_sales_price",Types.DOUBLE())
                           .field("ss_ext_discount_amt",Types.DOUBLE())
                           .field("ss_ext_sales_price",Types.DOUBLE())
                           .field("ss_ext_wholesale_cost",Types.DOUBLE())
                           .field("ss_ext_list_price",Types.DOUBLE())
                           .field("ss_ext_tax",Types.DOUBLE())
                           .field("ss_coupon_amt",Types.DOUBLE())
                           .field("ss_net_paid",Types.DOUBLE())
                           .field("ss_net_paid_inc_tax",Types.DOUBLE())
                           .field("ss_net_profit",Types.DOUBLE())
                           .build();

Issue :
 The StoreSales class which I created is not compatible with "forAvroRecordClass" function. Could anyone please suggest a way to make it compatible with the code ?



public class StoreSales extends SpecificRecordBase {
       public long datetime;
       public long ss_sold_date_sk;
       public long ss_sold_time_sk;
       public long ss_item_sk;
       public long ss_customer_sk ;
       public long ss_cdemo_sk;
       public long ss_hdemo_sk ;
       public long ss_addr_sk;
       public long ss_store_sk ;
       public long ss_promo_sk;
       public long ss_ticket_number;
       public int ss_quantity ;
       public double ss_wholesale_cost;
       public double ss_list_price;
       public double ss_sales_price;
       public double ss_ext_discount_amt;
       public double ss_ext_sales_price;
       public double ss_ext_wholesale_cost;
       public double ss_ext_list_price;
       public double ss_ext_tax;
       public double ss_coupon_amt;
       public double ss_net_paid;
       public double ss_net_paid_inc_tax;
       public double ss_net_profit;

       @Override
       public Schema getSchema() {
             // TODO Auto-generated method stub
             return null;
       }

       @Override
       public Object get(int field) {
             // TODO Auto-generated method stub
             return null;
       }

       @Override
       public void put(int field, Object value) {
             // TODO Auto-generated method stub

       }


Regards,
Kafeel Ansari