liu created FLINK-20330:
--------------------------- Summary: Flink connector has error in support hive external tables (hbase or es) Key: FLINK-20330 URL: https://issues.apache.org/jira/browse/FLINK-20330 Project: Flink Issue Type: Bug Components: Connectors / Hive Affects Versions: 1.11.2, 1.11.1, 1.11.0 Environment: TEST CODE LIKE THIS: CREATE EXTERNAL TABLE hive_to_es ( key string, value string ) STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler' TBLPROPERTIES( 'es.resource' = 'hive_to_es/_doc', 'es.index.auto.create' = 'TRUE', 'es.nodes'='192.168.1.111:9200,192.168.1.112:9200,192.168.1.113:9200' ); insert into hive_to_es (key, value) values ('name','tom'); insert into hive_to_es (key, value) values ('yes','aaa'); select * from hive_to_es; !image-2020-11-25-09-51-00-100.png|width=807,height=134! Reporter: liu Attachments: image-2020-11-25-09-42-13-102.png, image-2020-11-25-09-51-00-100.png [ERROR] Could not execute SQL statement. Reason: org.apache.flink.connectors.hive.FlinkHiveException: Unable to instantiate the hadoop input format !image-2020-11-25-09-42-13-102.png|width=384,height=288! we add a patch like this: flink-connector-hive_2.12-1.11.2.jar org/apache/flink/connectors/hive/HiveTableSink.java +134 ADD PATCH: {code:java} // code placeholder if (sd.getOutputFormat() == null && "org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setOutputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableOutputFormat"); } if (sd.getOutputFormat() == null && "org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setOutputFormat("org.elasticsearch.hadoop.hive.EsHiveOutputFormat"); } {code} org/apache/flink/connectors/hive/read/HiveTableInputFormat.java + 305 ADD PATCH: {code:java} // code placeholder if (sd.getInputFormat() == null && "org.apache.hadoop.hive.hbase.HBaseSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setInputFormat("org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat"); jobConf.set("hbase.table.name", partition.getTableProps().getProperty("hbase.table.name")); jobConf.set("hbase.columns.mapping", partition.getTableProps().getProperty("hbase.columns.mapping")); } if (sd.getInputFormat() == null && "org.elasticsearch.hadoop.hive.EsSerDe".equals(sd.getSerdeInfo().getSerializationLib())) { sd.setInputFormat("org.elasticsearch.hadoop.hive.EsHiveInputFormat"); jobConf.set("location", sd.getLocation()); for (Enumeration en = partition.getTableProps().keys(); en.hasMoreElements();) { String key = en.nextElement().toString(); if(key.startsWith("es.")){ jobConf.set(key, partition.getTableProps().getProperty(key)); } } } {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |