Hi Flink devs,
this is my final report about the HBaseOutputFormat problem (with Flink 0.8.1) and I hope you could suggest me the best way to make a PR: 1) The following code produce the error reported below (this should be fixed in 0.9 right?) Job job = Job.getInstance(); myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new *TableOutputFormat*<Text>(), job)); org.apache.flink.api.common.functions.InvalidTypesException: Interfaces and abstract classes are not valid types: class org.apache.hadoop.hbase.client.Mutation at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152) at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79) at org.apache.flink.api.java.DataSet.map(DataSet.java:160) 2) So I created a custom HBaseTableOutputFormat -*see at the end of the mail-* (that is basically copied from to the HBase TableInputFormat) that sets correctly the "mapred.output.dir" param required by the HadoopOutputFormatBase so I can make it work: Job job = Job.getInstance(); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName); HBaseTableOutputFormat<Text> hbaseTOF = new HBaseTableOutputFormat<>(); HadoopOutputFormat<Text, Put> outOF = new HadoopOutputFormat<>(hbaseTOF, job); myDataset.output(outOF); 3) However this does still not work unless you call setConf() of Configurable subclasses in the HadoopOutputFormatBase: - in the* public void finalizeGlobal(int parallelism) throws IOException* method: .... * if(this.mapreduceOutputFormat instanceof Configurable){* * ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* * }* this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), taskContext); .... - In the* public void open(int taskNumber, int numTasks) throws IOException* method: .... * if(this.mapreduceOutputFormat instanceof Configurable){* * ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* * }* try { this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID); } catch (Exception e) { throw new RuntimeException(e); } .... 4) Probably the modifications apported in point 3 should be applied both for mapreduce and mapred packages.. Thanks in advace, Flavio ----------------------------------------------------------------------- this is the HadoopOutputFormatBase.java: ----------------------------------------------------------------------- import java.io.IOException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; /** * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored * while the output value <u>must</u> be either a {@link Put} or a * {@link Delete} instance. * * @param <KEY> The type of the key. Ignored in this class. */ @InterfaceAudience.Public @InterfaceStability.Stable public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, Put>* implements Configurable { private final Log LOG = LogFactory.getLog(HBaseTableOutputFormat.class); /** Job parameter that specifies the output table. */ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; /** * Optional job parameter to specify a peer cluster. * Used specifying remote cluster when copying between hbase clusters (the * source is picked up from <code>hbase-site.xml</code>). * @see TableMapReduceUtil#initTableReducerJob(String, Class, org.apache.hadoop.mapreduce.Job, Class, String, String, String) */ public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum"; /** Optional job parameter to specify peer cluster's ZK client port */ public static final String QUORUM_PORT = "hbase.mapred.output.quorum.port"; /** Optional specification of the rs class name of the peer cluster */ public static final String REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; /** Optional specification of the rs impl name of the peer cluster */ public static final String REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; /** The configuration. */ private Configuration conf = null; private HTable table; /** * Writes the reducer output to an HBase table. * * @param <KEY> The type of the key. */ protected static class TableRecordWriter<KEY> *extends RecordWriter<KEY, Put> *{ /** The table to write to. */ private HTable table; /** * Instantiate a TableRecordWriter with the HBase HClient for writing. * * @param table The table to write to. */ public TableRecordWriter(HTable table) { this.table = table; } /** * Closes the writer, in this case flush table commits. * * @param context The context. * @throws IOException When closing the writer fails. * @see org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) */ @Override public void close(TaskAttemptContext context) throws IOException { table.close(); } /** * Writes a key/value pair into the table. * * @param key The key. * @param value The value. * @throws IOException When writing fails. * @see org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, java.lang.Object) */ @Override *public void write(KEY key, Put value)* * throws IOException {* * if (value instanceof Put) this.table.put(new Put((Put)value));* *// else if (value instanceof Delete) this.table.delete(new Delete((Delete)value));* * else throw new IOException("Pass a Delete or a Put");* * }* } /** * Creates a new record writer. * * @param context The current task context. * @return The newly created writer instance. * @throws IOException When creating the writer fails. * @throws InterruptedException When the jobs is cancelled. * @see org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) */ @Override public RecordWriter<KEY, *Put*> getRecordWriter( TaskAttemptContext context) throws IOException, InterruptedException { return new TableRecordWriter<KEY>(this.table); } /** * Checks if the output target exists. * * @param context The current context. * @throws IOException When the check fails. * @throws InterruptedException When the job is aborted. * @see org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) */ @Override public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException { // TODO Check if the table exists? } /** * Returns the output committer. * * @param context The current context. * @return The committer. * @throws IOException When creating the committer fails. * @throws InterruptedException When the job is aborted. * @see org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) */ @Override public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { return new TableOutputCommitter(); } public Configuration getConf() { return conf; } @Override public void setConf(Configuration otherConf) { this.conf = HBaseConfiguration.create(otherConf); String tableName = this.conf.get(OUTPUT_TABLE); if(tableName == null || tableName.length() <= 0) { throw new IllegalArgumentException("Must specify table name"); } String address = this.conf.get(QUORUM_ADDRESS); int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); String serverClass = this.conf.get(REGION_SERVER_CLASS); String serverImpl = this.conf.get(REGION_SERVER_IMPL); try { if (address != null) { ZKUtil.applyClusterKeyToConf(this.conf, address); } if (serverClass != null) { this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); } if (zkClientPort != 0) { this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); } this.table = new HTable(this.conf, tableName); this.table.setAutoFlush(false, true); * String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), this.table.getName()).toString();* * this.conf.set("mapred.output.dir", outDir);* * otherConf.set("mapred.output.dir", outDir);* LOG.info("Created table instance for " + tableName); } catch(IOException e) { LOG.error(e); throw new RuntimeException(e); } } } |
Any feedback about this?
On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <[hidden email]> wrote: > Hi Flink devs, > this is my final report about the HBaseOutputFormat problem (with Flink > 0.8.1) and I hope you could suggest me the best way to make a PR: > > 1) The following code produce the error reported below (this should be > fixed in 0.9 right?) > Job job = Job.getInstance(); > myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new > *TableOutputFormat*<Text>(), job)); > > org.apache.flink.api.common.functions.InvalidTypesException: Interfaces > and abstract classes are not valid types: class > org.apache.hadoop.hbase.client.Mutation > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376) > at > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296) > at > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152) > at > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79) > at org.apache.flink.api.java.DataSet.map(DataSet.java:160) > > 2) So I created a custom HBaseTableOutputFormat -*see at the end of the > mail-* (that is basically copied from to the HBase TableInputFormat) that > sets correctly the "mapred.output.dir" param required by the > HadoopOutputFormatBase so I can make it work: > Job job = Job.getInstance(); > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > outputTableName); > HBaseTableOutputFormat<Text> hbaseTOF = new HBaseTableOutputFormat<>(); > HadoopOutputFormat<Text, Put> outOF = new > HadoopOutputFormat<>(hbaseTOF, job); > myDataset.output(outOF); > > 3) However this does still not work unless you call setConf() of > Configurable subclasses in the HadoopOutputFormatBase: > > - in the* public void finalizeGlobal(int parallelism) throws IOException* > method: > .... > * if(this.mapreduceOutputFormat instanceof Configurable){* > * ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > * }* > this.fileOutputCommitter = new FileOutputCommitter(new > Path(this.configuration.get("mapred.output.dir")), taskContext); > .... > - In the* public void open(int taskNumber, int numTasks) throws > IOException* method: > .... > > * if(this.mapreduceOutputFormat instanceof Configurable){* > * ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > * }* > try { > this.context = > HadoopUtils.instantiateTaskAttemptContext(this.configuration, > taskAttemptID); > } catch (Exception e) { > throw new RuntimeException(e); > } > .... > > 4) Probably the modifications apported in point 3 should be applied both > for mapreduce and mapred packages.. > > Thanks in advace, > Flavio > > > > ----------------------------------------------------------------------- > this is the HadoopOutputFormatBase.java: > ----------------------------------------------------------------------- > import java.io.IOException; > > import org.apache.commons.logging.Log; > import org.apache.commons.logging.LogFactory; > import org.apache.hadoop.classification.InterfaceAudience; > import org.apache.hadoop.classification.InterfaceStability; > import org.apache.hadoop.conf.Configurable; > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.hbase.HBaseConfiguration; > import org.apache.hadoop.hbase.HConstants; > import org.apache.hadoop.hbase.client.Delete; > import org.apache.hadoop.hbase.client.HTable; > import org.apache.hadoop.hbase.client.Put; > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; > import org.apache.hadoop.hbase.util.FSUtils; > import org.apache.hadoop.hbase.zookeeper.ZKUtil; > import org.apache.hadoop.mapreduce.JobContext; > import org.apache.hadoop.mapreduce.OutputCommitter; > import org.apache.hadoop.mapreduce.OutputFormat; > import org.apache.hadoop.mapreduce.RecordWriter; > import org.apache.hadoop.mapreduce.TaskAttemptContext; > > /** > * Convert Map/Reduce output and write it to an HBase table. The KEY is > ignored > * while the output value <u>must</u> be either a {@link Put} or a > * {@link Delete} instance. > * > * @param <KEY> The type of the key. Ignored in this class. > */ > @InterfaceAudience.Public > @InterfaceStability.Stable > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, Put>* > implements Configurable { > > private final Log LOG = LogFactory.getLog(HBaseTableOutputFormat.class); > > /** Job parameter that specifies the output table. */ > public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; > > /** > * Optional job parameter to specify a peer cluster. > * Used specifying remote cluster when copying between hbase clusters > (the > * source is picked up from <code>hbase-site.xml</code>). > * @see TableMapReduceUtil#initTableReducerJob(String, Class, > org.apache.hadoop.mapreduce.Job, Class, String, String, String) > */ > public static final String QUORUM_ADDRESS = "hbase.mapred.output.quorum"; > > /** Optional job parameter to specify peer cluster's ZK client port */ > public static final String QUORUM_PORT = > "hbase.mapred.output.quorum.port"; > > /** Optional specification of the rs class name of the peer cluster */ > public static final String > REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; > /** Optional specification of the rs impl name of the peer cluster */ > public static final String > REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; > > /** The configuration. */ > private Configuration conf = null; > > private HTable table; > > /** > * Writes the reducer output to an HBase table. > * > * @param <KEY> The type of the key. > */ > protected static class TableRecordWriter<KEY> > *extends RecordWriter<KEY, Put> *{ > > /** The table to write to. */ > private HTable table; > > /** > * Instantiate a TableRecordWriter with the HBase HClient for writing. > * > * @param table The table to write to. > */ > public TableRecordWriter(HTable table) { > this.table = table; > } > > /** > * Closes the writer, in this case flush table commits. > * > * @param context The context. > * @throws IOException When closing the writer fails. > * @see > org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) > */ > @Override > public void close(TaskAttemptContext context) > throws IOException { > table.close(); > } > > /** > * Writes a key/value pair into the table. > * > * @param key The key. > * @param value The value. > * @throws IOException When writing fails. > * @see > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, > java.lang.Object) > */ > @Override > *public void write(KEY key, Put value)* > * throws IOException {* > * if (value instanceof Put) this.table.put(new Put((Put)value));* > *// else if (value instanceof Delete) this.table.delete(new > Delete((Delete)value));* > * else throw new IOException("Pass a Delete or a Put");* > * }* > } > > /** > * Creates a new record writer. > * > * @param context The current task context. > * @return The newly created writer instance. > * @throws IOException When creating the writer fails. > * @throws InterruptedException When the jobs is cancelled. > * @see > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) > */ > @Override > public RecordWriter<KEY, *Put*> getRecordWriter( > TaskAttemptContext context) > throws IOException, InterruptedException { > return new TableRecordWriter<KEY>(this.table); > } > > /** > * Checks if the output target exists. > * > * @param context The current context. > * @throws IOException When the check fails. > * @throws InterruptedException When the job is aborted. > * @see > org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) > */ > @Override > public void checkOutputSpecs(JobContext context) throws IOException, > InterruptedException { > // TODO Check if the table exists? > > } > > /** > * Returns the output committer. > * > * @param context The current context. > * @return The committer. > * @throws IOException When creating the committer fails. > * @throws InterruptedException When the job is aborted. > * @see > org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) > */ > @Override > public OutputCommitter getOutputCommitter(TaskAttemptContext context) > throws IOException, InterruptedException { > return new TableOutputCommitter(); > } > > public Configuration getConf() { > return conf; > } > > @Override > public void setConf(Configuration otherConf) { > this.conf = HBaseConfiguration.create(otherConf); > > String tableName = this.conf.get(OUTPUT_TABLE); > if(tableName == null || tableName.length() <= 0) { > throw new IllegalArgumentException("Must specify table name"); > } > > String address = this.conf.get(QUORUM_ADDRESS); > int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); > String serverClass = this.conf.get(REGION_SERVER_CLASS); > String serverImpl = this.conf.get(REGION_SERVER_IMPL); > > try { > if (address != null) { > ZKUtil.applyClusterKeyToConf(this.conf, address); > } > if (serverClass != null) { > this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); > } > if (zkClientPort != 0) { > this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); > } > this.table = new HTable(this.conf, tableName); > this.table.setAutoFlush(false, true); > * String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), > this.table.getName()).toString();* > * this.conf.set("mapred.output.dir", outDir);* > * otherConf.set("mapred.output.dir", outDir);* > LOG.info("Created table instance for " + tableName); > } catch(IOException e) { > LOG.error(e); > throw new RuntimeException(e); > } > } > } > > |
As I said before, I think the configure() method of the original
HadoopOutputFormat should be called in the configure() method of the Flink HadoopOutputFormatBase. Flink calls configure() before open() and finalizeOnMaster(), so that should work. Have you checked if that fixes your problem? If yes, I'd suggest to open a PR with this fix. Thanks, Fabian 2015-04-01 13:44 GMT+02:00 Flavio Pompermaier <[hidden email]>: > Any feedback about this? > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <[hidden email]> > wrote: > > > Hi Flink devs, > > this is my final report about the HBaseOutputFormat problem (with Flink > > 0.8.1) and I hope you could suggest me the best way to make a PR: > > > > 1) The following code produce the error reported below (this should be > > fixed in 0.9 right?) > > Job job = Job.getInstance(); > > myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new > > *TableOutputFormat*<Text>(), job)); > > > > org.apache.flink.api.common.functions.InvalidTypesException: Interfaces > > and abstract classes are not valid types: class > > org.apache.hadoop.hbase.client.Mutation > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79) > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160) > > > > 2) So I created a custom HBaseTableOutputFormat -*see at the end of the > > mail-* (that is basically copied from to the HBase TableInputFormat) that > > sets correctly the "mapred.output.dir" param required by the > > HadoopOutputFormatBase so I can make it work: > > Job job = Job.getInstance(); > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > > outputTableName); > > HBaseTableOutputFormat<Text> hbaseTOF = new HBaseTableOutputFormat<>(); > > HadoopOutputFormat<Text, Put> outOF = new > > HadoopOutputFormat<>(hbaseTOF, job); > > myDataset.output(outOF); > > > > 3) However this does still not work unless you call setConf() of > > Configurable subclasses in the HadoopOutputFormatBase: > > > > - in the* public void finalizeGlobal(int parallelism) throws IOException* > > method: > > .... > > * if(this.mapreduceOutputFormat instanceof Configurable){* > > * > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > * }* > > this.fileOutputCommitter = new FileOutputCommitter(new > > Path(this.configuration.get("mapred.output.dir")), taskContext); > > .... > > - In the* public void open(int taskNumber, int numTasks) throws > > IOException* method: > > .... > > > > * if(this.mapreduceOutputFormat instanceof Configurable){* > > * > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > * }* > > try { > > this.context = > > HadoopUtils.instantiateTaskAttemptContext(this.configuration, > > taskAttemptID); > > } catch (Exception e) { > > throw new RuntimeException(e); > > } > > .... > > > > 4) Probably the modifications apported in point 3 should be applied both > > for mapreduce and mapred packages.. > > > > Thanks in advace, > > Flavio > > > > > > > > ----------------------------------------------------------------------- > > this is the HadoopOutputFormatBase.java: > > ----------------------------------------------------------------------- > > import java.io.IOException; > > > > import org.apache.commons.logging.Log; > > import org.apache.commons.logging.LogFactory; > > import org.apache.hadoop.classification.InterfaceAudience; > > import org.apache.hadoop.classification.InterfaceStability; > > import org.apache.hadoop.conf.Configurable; > > import org.apache.hadoop.conf.Configuration; > > import org.apache.hadoop.hbase.HBaseConfiguration; > > import org.apache.hadoop.hbase.HConstants; > > import org.apache.hadoop.hbase.client.Delete; > > import org.apache.hadoop.hbase.client.HTable; > > import org.apache.hadoop.hbase.client.Put; > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; > > import org.apache.hadoop.hbase.util.FSUtils; > > import org.apache.hadoop.hbase.zookeeper.ZKUtil; > > import org.apache.hadoop.mapreduce.JobContext; > > import org.apache.hadoop.mapreduce.OutputCommitter; > > import org.apache.hadoop.mapreduce.OutputFormat; > > import org.apache.hadoop.mapreduce.RecordWriter; > > import org.apache.hadoop.mapreduce.TaskAttemptContext; > > > > /** > > * Convert Map/Reduce output and write it to an HBase table. The KEY is > > ignored > > * while the output value <u>must</u> be either a {@link Put} or a > > * {@link Delete} instance. > > * > > * @param <KEY> The type of the key. Ignored in this class. > > */ > > @InterfaceAudience.Public > > @InterfaceStability.Stable > > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, Put>* > > implements Configurable { > > > > private final Log LOG = > LogFactory.getLog(HBaseTableOutputFormat.class); > > > > /** Job parameter that specifies the output table. */ > > public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; > > > > /** > > * Optional job parameter to specify a peer cluster. > > * Used specifying remote cluster when copying between hbase clusters > > (the > > * source is picked up from <code>hbase-site.xml</code>). > > * @see TableMapReduceUtil#initTableReducerJob(String, Class, > > org.apache.hadoop.mapreduce.Job, Class, String, String, String) > > */ > > public static final String QUORUM_ADDRESS = > "hbase.mapred.output.quorum"; > > > > /** Optional job parameter to specify peer cluster's ZK client port */ > > public static final String QUORUM_PORT = > > "hbase.mapred.output.quorum.port"; > > > > /** Optional specification of the rs class name of the peer cluster */ > > public static final String > > REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; > > /** Optional specification of the rs impl name of the peer cluster */ > > public static final String > > REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; > > > > /** The configuration. */ > > private Configuration conf = null; > > > > private HTable table; > > > > /** > > * Writes the reducer output to an HBase table. > > * > > * @param <KEY> The type of the key. > > */ > > protected static class TableRecordWriter<KEY> > > *extends RecordWriter<KEY, Put> *{ > > > > /** The table to write to. */ > > private HTable table; > > > > /** > > * Instantiate a TableRecordWriter with the HBase HClient for > writing. > > * > > * @param table The table to write to. > > */ > > public TableRecordWriter(HTable table) { > > this.table = table; > > } > > > > /** > > * Closes the writer, in this case flush table commits. > > * > > * @param context The context. > > * @throws IOException When closing the writer fails. > > * @see > > > org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) > > */ > > @Override > > public void close(TaskAttemptContext context) > > throws IOException { > > table.close(); > > } > > > > /** > > * Writes a key/value pair into the table. > > * > > * @param key The key. > > * @param value The value. > > * @throws IOException When writing fails. > > * @see > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, > > java.lang.Object) > > */ > > @Override > > *public void write(KEY key, Put value)* > > * throws IOException {* > > * if (value instanceof Put) this.table.put(new Put((Put)value));* > > *// else if (value instanceof Delete) this.table.delete(new > > Delete((Delete)value));* > > * else throw new IOException("Pass a Delete or a Put");* > > * }* > > } > > > > /** > > * Creates a new record writer. > > * > > * @param context The current task context. > > * @return The newly created writer instance. > > * @throws IOException When creating the writer fails. > > * @throws InterruptedException When the jobs is cancelled. > > * @see > > > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > */ > > @Override > > public RecordWriter<KEY, *Put*> getRecordWriter( > > TaskAttemptContext context) > > throws IOException, InterruptedException { > > return new TableRecordWriter<KEY>(this.table); > > } > > > > /** > > * Checks if the output target exists. > > * > > * @param context The current context. > > * @throws IOException When the check fails. > > * @throws InterruptedException When the job is aborted. > > * @see > > > org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) > > */ > > @Override > > public void checkOutputSpecs(JobContext context) throws IOException, > > InterruptedException { > > // TODO Check if the table exists? > > > > } > > > > /** > > * Returns the output committer. > > * > > * @param context The current context. > > * @return The committer. > > * @throws IOException When creating the committer fails. > > * @throws InterruptedException When the job is aborted. > > * @see > > > org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > */ > > @Override > > public OutputCommitter getOutputCommitter(TaskAttemptContext context) > > throws IOException, InterruptedException { > > return new TableOutputCommitter(); > > } > > > > public Configuration getConf() { > > return conf; > > } > > > > @Override > > public void setConf(Configuration otherConf) { > > this.conf = HBaseConfiguration.create(otherConf); > > > > String tableName = this.conf.get(OUTPUT_TABLE); > > if(tableName == null || tableName.length() <= 0) { > > throw new IllegalArgumentException("Must specify table name"); > > } > > > > String address = this.conf.get(QUORUM_ADDRESS); > > int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); > > String serverClass = this.conf.get(REGION_SERVER_CLASS); > > String serverImpl = this.conf.get(REGION_SERVER_IMPL); > > > > try { > > if (address != null) { > > ZKUtil.applyClusterKeyToConf(this.conf, address); > > } > > if (serverClass != null) { > > this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); > > } > > if (zkClientPort != 0) { > > this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); > > } > > this.table = new HTable(this.conf, tableName); > > this.table.setAutoFlush(false, true); > > * String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), > > this.table.getName()).toString();* > > * this.conf.set("mapred.output.dir", outDir);* > > * otherConf.set("mapred.output.dir", outDir);* > > LOG.info("Created table instance for " + tableName); > > } catch(IOException e) { > > LOG.error(e); > > throw new RuntimeException(e); > > } > > } > > } > > > > > |
In reply to this post by Flavio Pompermaier
Hi Flavio,
Thanks for looking into this problem. Actually, it's a bit difficult to discuss your changes here because of the formatting/syntax highlighting and missing context of the classes. Usually, we do that in a pull request. Do you have a GitHub account? If so, push your changes to your forked Flink repository. GitHub will then offer you to create a pull request for your modified branch. Let's discuss your changes on GitHub. Best, Max On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier <[hidden email]> wrote: > Any feedback about this? > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier <[hidden email]> > wrote: > > > Hi Flink devs, > > this is my final report about the HBaseOutputFormat problem (with Flink > > 0.8.1) and I hope you could suggest me the best way to make a PR: > > > > 1) The following code produce the error reported below (this should be > > fixed in 0.9 right?) > > Job job = Job.getInstance(); > > myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new > > *TableOutputFormat*<Text>(), job)); > > > > org.apache.flink.api.common.functions.InvalidTypesException: Interfaces > > and abstract classes are not valid types: class > > org.apache.hadoop.hbase.client.Mutation > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152) > > at > > > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79) > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160) > > > > 2) So I created a custom HBaseTableOutputFormat -*see at the end of the > > mail-* (that is basically copied from to the HBase TableInputFormat) that > > sets correctly the "mapred.output.dir" param required by the > > HadoopOutputFormatBase so I can make it work: > > Job job = Job.getInstance(); > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > > outputTableName); > > HBaseTableOutputFormat<Text> hbaseTOF = new HBaseTableOutputFormat<>(); > > HadoopOutputFormat<Text, Put> outOF = new > > HadoopOutputFormat<>(hbaseTOF, job); > > myDataset.output(outOF); > > > > 3) However this does still not work unless you call setConf() of > > Configurable subclasses in the HadoopOutputFormatBase: > > > > - in the* public void finalizeGlobal(int parallelism) throws IOException* > > method: > > .... > > * if(this.mapreduceOutputFormat instanceof Configurable){* > > * > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > * }* > > this.fileOutputCommitter = new FileOutputCommitter(new > > Path(this.configuration.get("mapred.output.dir")), taskContext); > > .... > > - In the* public void open(int taskNumber, int numTasks) throws > > IOException* method: > > .... > > > > * if(this.mapreduceOutputFormat instanceof Configurable){* > > * > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > * }* > > try { > > this.context = > > HadoopUtils.instantiateTaskAttemptContext(this.configuration, > > taskAttemptID); > > } catch (Exception e) { > > throw new RuntimeException(e); > > } > > .... > > > > 4) Probably the modifications apported in point 3 should be applied both > > for mapreduce and mapred packages.. > > > > Thanks in advace, > > Flavio > > > > > > > > ----------------------------------------------------------------------- > > this is the HadoopOutputFormatBase.java: > > ----------------------------------------------------------------------- > > import java.io.IOException; > > > > import org.apache.commons.logging.Log; > > import org.apache.commons.logging.LogFactory; > > import org.apache.hadoop.classification.InterfaceAudience; > > import org.apache.hadoop.classification.InterfaceStability; > > import org.apache.hadoop.conf.Configurable; > > import org.apache.hadoop.conf.Configuration; > > import org.apache.hadoop.hbase.HBaseConfiguration; > > import org.apache.hadoop.hbase.HConstants; > > import org.apache.hadoop.hbase.client.Delete; > > import org.apache.hadoop.hbase.client.HTable; > > import org.apache.hadoop.hbase.client.Put; > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; > > import org.apache.hadoop.hbase.util.FSUtils; > > import org.apache.hadoop.hbase.zookeeper.ZKUtil; > > import org.apache.hadoop.mapreduce.JobContext; > > import org.apache.hadoop.mapreduce.OutputCommitter; > > import org.apache.hadoop.mapreduce.OutputFormat; > > import org.apache.hadoop.mapreduce.RecordWriter; > > import org.apache.hadoop.mapreduce.TaskAttemptContext; > > > > /** > > * Convert Map/Reduce output and write it to an HBase table. The KEY is > > ignored > > * while the output value <u>must</u> be either a {@link Put} or a > > * {@link Delete} instance. > > * > > * @param <KEY> The type of the key. Ignored in this class. > > */ > > @InterfaceAudience.Public > > @InterfaceStability.Stable > > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, Put>* > > implements Configurable { > > > > private final Log LOG = > LogFactory.getLog(HBaseTableOutputFormat.class); > > > > /** Job parameter that specifies the output table. */ > > public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; > > > > /** > > * Optional job parameter to specify a peer cluster. > > * Used specifying remote cluster when copying between hbase clusters > > (the > > * source is picked up from <code>hbase-site.xml</code>). > > * @see TableMapReduceUtil#initTableReducerJob(String, Class, > > org.apache.hadoop.mapreduce.Job, Class, String, String, String) > > */ > > public static final String QUORUM_ADDRESS = > "hbase.mapred.output.quorum"; > > > > /** Optional job parameter to specify peer cluster's ZK client port */ > > public static final String QUORUM_PORT = > > "hbase.mapred.output.quorum.port"; > > > > /** Optional specification of the rs class name of the peer cluster */ > > public static final String > > REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; > > /** Optional specification of the rs impl name of the peer cluster */ > > public static final String > > REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; > > > > /** The configuration. */ > > private Configuration conf = null; > > > > private HTable table; > > > > /** > > * Writes the reducer output to an HBase table. > > * > > * @param <KEY> The type of the key. > > */ > > protected static class TableRecordWriter<KEY> > > *extends RecordWriter<KEY, Put> *{ > > > > /** The table to write to. */ > > private HTable table; > > > > /** > > * Instantiate a TableRecordWriter with the HBase HClient for > writing. > > * > > * @param table The table to write to. > > */ > > public TableRecordWriter(HTable table) { > > this.table = table; > > } > > > > /** > > * Closes the writer, in this case flush table commits. > > * > > * @param context The context. > > * @throws IOException When closing the writer fails. > > * @see > > > org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) > > */ > > @Override > > public void close(TaskAttemptContext context) > > throws IOException { > > table.close(); > > } > > > > /** > > * Writes a key/value pair into the table. > > * > > * @param key The key. > > * @param value The value. > > * @throws IOException When writing fails. > > * @see > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, > > java.lang.Object) > > */ > > @Override > > *public void write(KEY key, Put value)* > > * throws IOException {* > > * if (value instanceof Put) this.table.put(new Put((Put)value));* > > *// else if (value instanceof Delete) this.table.delete(new > > Delete((Delete)value));* > > * else throw new IOException("Pass a Delete or a Put");* > > * }* > > } > > > > /** > > * Creates a new record writer. > > * > > * @param context The current task context. > > * @return The newly created writer instance. > > * @throws IOException When creating the writer fails. > > * @throws InterruptedException When the jobs is cancelled. > > * @see > > > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > */ > > @Override > > public RecordWriter<KEY, *Put*> getRecordWriter( > > TaskAttemptContext context) > > throws IOException, InterruptedException { > > return new TableRecordWriter<KEY>(this.table); > > } > > > > /** > > * Checks if the output target exists. > > * > > * @param context The current context. > > * @throws IOException When the check fails. > > * @throws InterruptedException When the job is aborted. > > * @see > > > org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) > > */ > > @Override > > public void checkOutputSpecs(JobContext context) throws IOException, > > InterruptedException { > > // TODO Check if the table exists? > > > > } > > > > /** > > * Returns the output committer. > > * > > * @param context The current context. > > * @return The committer. > > * @throws IOException When creating the committer fails. > > * @throws InterruptedException When the job is aborted. > > * @see > > > org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > */ > > @Override > > public OutputCommitter getOutputCommitter(TaskAttemptContext context) > > throws IOException, InterruptedException { > > return new TableOutputCommitter(); > > } > > > > public Configuration getConf() { > > return conf; > > } > > > > @Override > > public void setConf(Configuration otherConf) { > > this.conf = HBaseConfiguration.create(otherConf); > > > > String tableName = this.conf.get(OUTPUT_TABLE); > > if(tableName == null || tableName.length() <= 0) { > > throw new IllegalArgumentException("Must specify table name"); > > } > > > > String address = this.conf.get(QUORUM_ADDRESS); > > int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); > > String serverClass = this.conf.get(REGION_SERVER_CLASS); > > String serverImpl = this.conf.get(REGION_SERVER_IMPL); > > > > try { > > if (address != null) { > > ZKUtil.applyClusterKeyToConf(this.conf, address); > > } > > if (serverClass != null) { > > this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); > > } > > if (zkClientPort != 0) { > > this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, zkClientPort); > > } > > this.table = new HTable(this.conf, tableName); > > this.table.setAutoFlush(false, true); > > * String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), > > this.table.getName()).toString();* > > * this.conf.set("mapred.output.dir", outDir);* > > * otherConf.set("mapred.output.dir", outDir);* > > LOG.info("Created table instance for " + tableName); > > } catch(IOException e) { > > LOG.error(e); > > throw new RuntimeException(e); > > } > > } > > } > > > > > |
Ok..I'd like to have this fix in the next release. Should I branch Flink
0.8.1 or 0.9 or which version? On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <[hidden email]> wrote: > Hi Flavio, > > Thanks for looking into this problem. Actually, it's a bit difficult to > discuss your changes here because of the formatting/syntax highlighting and > missing context of the classes. Usually, we do that in a pull request. Do > you have a GitHub account? If so, push your changes to your forked Flink > repository. GitHub will then offer you to create a pull request for your > modified branch. > > Let's discuss your changes on GitHub. > > Best, > Max > > On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier <[hidden email]> > wrote: > > > Any feedback about this? > > > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier < > [hidden email]> > > wrote: > > > > > Hi Flink devs, > > > this is my final report about the HBaseOutputFormat problem (with Flink > > > 0.8.1) and I hope you could suggest me the best way to make a PR: > > > > > > 1) The following code produce the error reported below (this should be > > > fixed in 0.9 right?) > > > Job job = Job.getInstance(); > > > myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new > > > *TableOutputFormat*<Text>(), job)); > > > > > > org.apache.flink.api.common.functions.InvalidTypesException: Interfaces > > > and abstract classes are not valid types: class > > > org.apache.hadoop.hbase.client.Mutation > > > at > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) > > > at > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) > > > at > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376) > > > at > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296) > > > at > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224) > > > at > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152) > > > at > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79) > > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160) > > > > > > 2) So I created a custom HBaseTableOutputFormat -*see at the end of > the > > > mail-* (that is basically copied from to the HBase TableInputFormat) > that > > > sets correctly the "mapred.output.dir" param required by the > > > HadoopOutputFormatBase so I can make it work: > > > Job job = Job.getInstance(); > > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > > > outputTableName); > > > HBaseTableOutputFormat<Text> hbaseTOF = new HBaseTableOutputFormat<>(); > > > HadoopOutputFormat<Text, Put> outOF = new > > > HadoopOutputFormat<>(hbaseTOF, job); > > > myDataset.output(outOF); > > > > > > 3) However this does still not work unless you call setConf() of > > > Configurable subclasses in the HadoopOutputFormatBase: > > > > > > - in the* public void finalizeGlobal(int parallelism) throws > IOException* > > > method: > > > .... > > > * if(this.mapreduceOutputFormat instanceof > Configurable){* > > > * > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > * }* > > > this.fileOutputCommitter = new FileOutputCommitter(new > > > Path(this.configuration.get("mapred.output.dir")), taskContext); > > > .... > > > - In the* public void open(int taskNumber, int numTasks) throws > > > IOException* method: > > > .... > > > > > > * if(this.mapreduceOutputFormat instanceof > Configurable){* > > > * > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > * }* > > > try { > > > this.context = > > > HadoopUtils.instantiateTaskAttemptContext(this.configuration, > > > taskAttemptID); > > > } catch (Exception e) { > > > throw new RuntimeException(e); > > > } > > > .... > > > > > > 4) Probably the modifications apported in point 3 should be applied > both > > > for mapreduce and mapred packages.. > > > > > > Thanks in advace, > > > Flavio > > > > > > > > > > > > ----------------------------------------------------------------------- > > > this is the HadoopOutputFormatBase.java: > > > ----------------------------------------------------------------------- > > > import java.io.IOException; > > > > > > import org.apache.commons.logging.Log; > > > import org.apache.commons.logging.LogFactory; > > > import org.apache.hadoop.classification.InterfaceAudience; > > > import org.apache.hadoop.classification.InterfaceStability; > > > import org.apache.hadoop.conf.Configurable; > > > import org.apache.hadoop.conf.Configuration; > > > import org.apache.hadoop.hbase.HBaseConfiguration; > > > import org.apache.hadoop.hbase.HConstants; > > > import org.apache.hadoop.hbase.client.Delete; > > > import org.apache.hadoop.hbase.client.HTable; > > > import org.apache.hadoop.hbase.client.Put; > > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; > > > import org.apache.hadoop.hbase.util.FSUtils; > > > import org.apache.hadoop.hbase.zookeeper.ZKUtil; > > > import org.apache.hadoop.mapreduce.JobContext; > > > import org.apache.hadoop.mapreduce.OutputCommitter; > > > import org.apache.hadoop.mapreduce.OutputFormat; > > > import org.apache.hadoop.mapreduce.RecordWriter; > > > import org.apache.hadoop.mapreduce.TaskAttemptContext; > > > > > > /** > > > * Convert Map/Reduce output and write it to an HBase table. The KEY is > > > ignored > > > * while the output value <u>must</u> be either a {@link Put} or a > > > * {@link Delete} instance. > > > * > > > * @param <KEY> The type of the key. Ignored in this class. > > > */ > > > @InterfaceAudience.Public > > > @InterfaceStability.Stable > > > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, > Put>* > > > implements Configurable { > > > > > > private final Log LOG = > > LogFactory.getLog(HBaseTableOutputFormat.class); > > > > > > /** Job parameter that specifies the output table. */ > > > public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; > > > > > > /** > > > * Optional job parameter to specify a peer cluster. > > > * Used specifying remote cluster when copying between hbase clusters > > > (the > > > * source is picked up from <code>hbase-site.xml</code>). > > > * @see TableMapReduceUtil#initTableReducerJob(String, Class, > > > org.apache.hadoop.mapreduce.Job, Class, String, String, String) > > > */ > > > public static final String QUORUM_ADDRESS = > > "hbase.mapred.output.quorum"; > > > > > > /** Optional job parameter to specify peer cluster's ZK client port > */ > > > public static final String QUORUM_PORT = > > > "hbase.mapred.output.quorum.port"; > > > > > > /** Optional specification of the rs class name of the peer cluster > */ > > > public static final String > > > REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; > > > /** Optional specification of the rs impl name of the peer cluster */ > > > public static final String > > > REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; > > > > > > /** The configuration. */ > > > private Configuration conf = null; > > > > > > private HTable table; > > > > > > /** > > > * Writes the reducer output to an HBase table. > > > * > > > * @param <KEY> The type of the key. > > > */ > > > protected static class TableRecordWriter<KEY> > > > *extends RecordWriter<KEY, Put> *{ > > > > > > /** The table to write to. */ > > > private HTable table; > > > > > > /** > > > * Instantiate a TableRecordWriter with the HBase HClient for > > writing. > > > * > > > * @param table The table to write to. > > > */ > > > public TableRecordWriter(HTable table) { > > > this.table = table; > > > } > > > > > > /** > > > * Closes the writer, in this case flush table commits. > > > * > > > * @param context The context. > > > * @throws IOException When closing the writer fails. > > > * @see > > > > > > org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > */ > > > @Override > > > public void close(TaskAttemptContext context) > > > throws IOException { > > > table.close(); > > > } > > > > > > /** > > > * Writes a key/value pair into the table. > > > * > > > * @param key The key. > > > * @param value The value. > > > * @throws IOException When writing fails. > > > * @see > > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, > > > java.lang.Object) > > > */ > > > @Override > > > *public void write(KEY key, Put value)* > > > * throws IOException {* > > > * if (value instanceof Put) this.table.put(new Put((Put)value));* > > > *// else if (value instanceof Delete) this.table.delete(new > > > Delete((Delete)value));* > > > * else throw new IOException("Pass a Delete or a Put");* > > > * }* > > > } > > > > > > /** > > > * Creates a new record writer. > > > * > > > * @param context The current task context. > > > * @return The newly created writer instance. > > > * @throws IOException When creating the writer fails. > > > * @throws InterruptedException When the jobs is cancelled. > > > * @see > > > > > > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > */ > > > @Override > > > public RecordWriter<KEY, *Put*> getRecordWriter( > > > TaskAttemptContext context) > > > throws IOException, InterruptedException { > > > return new TableRecordWriter<KEY>(this.table); > > > } > > > > > > /** > > > * Checks if the output target exists. > > > * > > > * @param context The current context. > > > * @throws IOException When the check fails. > > > * @throws InterruptedException When the job is aborted. > > > * @see > > > > > > org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) > > > */ > > > @Override > > > public void checkOutputSpecs(JobContext context) throws IOException, > > > InterruptedException { > > > // TODO Check if the table exists? > > > > > > } > > > > > > /** > > > * Returns the output committer. > > > * > > > * @param context The current context. > > > * @return The committer. > > > * @throws IOException When creating the committer fails. > > > * @throws InterruptedException When the job is aborted. > > > * @see > > > > > > org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > */ > > > @Override > > > public OutputCommitter getOutputCommitter(TaskAttemptContext context) > > > throws IOException, InterruptedException { > > > return new TableOutputCommitter(); > > > } > > > > > > public Configuration getConf() { > > > return conf; > > > } > > > > > > @Override > > > public void setConf(Configuration otherConf) { > > > this.conf = HBaseConfiguration.create(otherConf); > > > > > > String tableName = this.conf.get(OUTPUT_TABLE); > > > if(tableName == null || tableName.length() <= 0) { > > > throw new IllegalArgumentException("Must specify table name"); > > > } > > > > > > String address = this.conf.get(QUORUM_ADDRESS); > > > int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); > > > String serverClass = this.conf.get(REGION_SERVER_CLASS); > > > String serverImpl = this.conf.get(REGION_SERVER_IMPL); > > > > > > try { > > > if (address != null) { > > > ZKUtil.applyClusterKeyToConf(this.conf, address); > > > } > > > if (serverClass != null) { > > > this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); > > > } > > > if (zkClientPort != 0) { > > > this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, > zkClientPort); > > > } > > > this.table = new HTable(this.conf, tableName); > > > this.table.setAutoFlush(false, true); > > > * String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), > > > this.table.getName()).toString();* > > > * this.conf.set("mapred.output.dir", outDir);* > > > * otherConf.set("mapred.output.dir", outDir);* > > > LOG.info("Created table instance for " + tableName); > > > } catch(IOException e) { > > > LOG.error(e); > > > throw new RuntimeException(e); > > > } > > > } > > > } > > > > > > > > > |
What ever works best for you.
We can easily backport or forwardport the patch. 2015-04-01 14:12 GMT+02:00 Flavio Pompermaier <[hidden email]>: > Ok..I'd like to have this fix in the next release. Should I branch Flink > 0.8.1 or 0.9 or which version? > > On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <[hidden email]> wrote: > > > Hi Flavio, > > > > Thanks for looking into this problem. Actually, it's a bit difficult to > > discuss your changes here because of the formatting/syntax highlighting > and > > missing context of the classes. Usually, we do that in a pull request. Do > > you have a GitHub account? If so, push your changes to your forked Flink > > repository. GitHub will then offer you to create a pull request for your > > modified branch. > > > > Let's discuss your changes on GitHub. > > > > Best, > > Max > > > > On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier <[hidden email] > > > > wrote: > > > > > Any feedback about this? > > > > > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier < > > [hidden email]> > > > wrote: > > > > > > > Hi Flink devs, > > > > this is my final report about the HBaseOutputFormat problem (with > Flink > > > > 0.8.1) and I hope you could suggest me the best way to make a PR: > > > > > > > > 1) The following code produce the error reported below (this should > be > > > > fixed in 0.9 right?) > > > > Job job = Job.getInstance(); > > > > myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new > > > > *TableOutputFormat*<Text>(), job)); > > > > > > > > org.apache.flink.api.common.functions.InvalidTypesException: > Interfaces > > > > and abstract classes are not valid types: class > > > > org.apache.hadoop.hbase.client.Mutation > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376) > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296) > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224) > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152) > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79) > > > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160) > > > > > > > > 2) So I created a custom HBaseTableOutputFormat -*see at the end of > > the > > > > mail-* (that is basically copied from to the HBase TableInputFormat) > > that > > > > sets correctly the "mapred.output.dir" param required by the > > > > HadoopOutputFormatBase so I can make it work: > > > > Job job = Job.getInstance(); > > > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > > > > outputTableName); > > > > HBaseTableOutputFormat<Text> hbaseTOF = new > HBaseTableOutputFormat<>(); > > > > HadoopOutputFormat<Text, Put> outOF = new > > > > HadoopOutputFormat<>(hbaseTOF, job); > > > > myDataset.output(outOF); > > > > > > > > 3) However this does still not work unless you call setConf() of > > > > Configurable subclasses in the HadoopOutputFormatBase: > > > > > > > > - in the* public void finalizeGlobal(int parallelism) throws > > IOException* > > > > method: > > > > .... > > > > * if(this.mapreduceOutputFormat instanceof > > Configurable){* > > > > * > > > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > > * }* > > > > this.fileOutputCommitter = new FileOutputCommitter(new > > > > Path(this.configuration.get("mapred.output.dir")), taskContext); > > > > .... > > > > - In the* public void open(int taskNumber, int numTasks) throws > > > > IOException* method: > > > > .... > > > > > > > > * if(this.mapreduceOutputFormat instanceof > > Configurable){* > > > > * > > > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > > * }* > > > > try { > > > > this.context = > > > > HadoopUtils.instantiateTaskAttemptContext(this.configuration, > > > > taskAttemptID); > > > > } catch (Exception e) { > > > > throw new RuntimeException(e); > > > > } > > > > .... > > > > > > > > 4) Probably the modifications apported in point 3 should be applied > > both > > > > for mapreduce and mapred packages.. > > > > > > > > Thanks in advace, > > > > Flavio > > > > > > > > > > > > > > > > > ----------------------------------------------------------------------- > > > > this is the HadoopOutputFormatBase.java: > > > > > ----------------------------------------------------------------------- > > > > import java.io.IOException; > > > > > > > > import org.apache.commons.logging.Log; > > > > import org.apache.commons.logging.LogFactory; > > > > import org.apache.hadoop.classification.InterfaceAudience; > > > > import org.apache.hadoop.classification.InterfaceStability; > > > > import org.apache.hadoop.conf.Configurable; > > > > import org.apache.hadoop.conf.Configuration; > > > > import org.apache.hadoop.hbase.HBaseConfiguration; > > > > import org.apache.hadoop.hbase.HConstants; > > > > import org.apache.hadoop.hbase.client.Delete; > > > > import org.apache.hadoop.hbase.client.HTable; > > > > import org.apache.hadoop.hbase.client.Put; > > > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; > > > > import org.apache.hadoop.hbase.util.FSUtils; > > > > import org.apache.hadoop.hbase.zookeeper.ZKUtil; > > > > import org.apache.hadoop.mapreduce.JobContext; > > > > import org.apache.hadoop.mapreduce.OutputCommitter; > > > > import org.apache.hadoop.mapreduce.OutputFormat; > > > > import org.apache.hadoop.mapreduce.RecordWriter; > > > > import org.apache.hadoop.mapreduce.TaskAttemptContext; > > > > > > > > /** > > > > * Convert Map/Reduce output and write it to an HBase table. The KEY > is > > > > ignored > > > > * while the output value <u>must</u> be either a {@link Put} or a > > > > * {@link Delete} instance. > > > > * > > > > * @param <KEY> The type of the key. Ignored in this class. > > > > */ > > > > @InterfaceAudience.Public > > > > @InterfaceStability.Stable > > > > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, > > Put>* > > > > implements Configurable { > > > > > > > > private final Log LOG = > > > LogFactory.getLog(HBaseTableOutputFormat.class); > > > > > > > > /** Job parameter that specifies the output table. */ > > > > public static final String OUTPUT_TABLE = > "hbase.mapred.outputtable"; > > > > > > > > /** > > > > * Optional job parameter to specify a peer cluster. > > > > * Used specifying remote cluster when copying between hbase > clusters > > > > (the > > > > * source is picked up from <code>hbase-site.xml</code>). > > > > * @see TableMapReduceUtil#initTableReducerJob(String, Class, > > > > org.apache.hadoop.mapreduce.Job, Class, String, String, String) > > > > */ > > > > public static final String QUORUM_ADDRESS = > > > "hbase.mapred.output.quorum"; > > > > > > > > /** Optional job parameter to specify peer cluster's ZK client port > > */ > > > > public static final String QUORUM_PORT = > > > > "hbase.mapred.output.quorum.port"; > > > > > > > > /** Optional specification of the rs class name of the peer cluster > > */ > > > > public static final String > > > > REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; > > > > /** Optional specification of the rs impl name of the peer cluster > */ > > > > public static final String > > > > REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; > > > > > > > > /** The configuration. */ > > > > private Configuration conf = null; > > > > > > > > private HTable table; > > > > > > > > /** > > > > * Writes the reducer output to an HBase table. > > > > * > > > > * @param <KEY> The type of the key. > > > > */ > > > > protected static class TableRecordWriter<KEY> > > > > *extends RecordWriter<KEY, Put> *{ > > > > > > > > /** The table to write to. */ > > > > private HTable table; > > > > > > > > /** > > > > * Instantiate a TableRecordWriter with the HBase HClient for > > > writing. > > > > * > > > > * @param table The table to write to. > > > > */ > > > > public TableRecordWriter(HTable table) { > > > > this.table = table; > > > > } > > > > > > > > /** > > > > * Closes the writer, in this case flush table commits. > > > > * > > > > * @param context The context. > > > > * @throws IOException When closing the writer fails. > > > > * @see > > > > > > > > > > org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > */ > > > > @Override > > > > public void close(TaskAttemptContext context) > > > > throws IOException { > > > > table.close(); > > > > } > > > > > > > > /** > > > > * Writes a key/value pair into the table. > > > > * > > > > * @param key The key. > > > > * @param value The value. > > > > * @throws IOException When writing fails. > > > > * @see > > > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, > > > > java.lang.Object) > > > > */ > > > > @Override > > > > *public void write(KEY key, Put value)* > > > > * throws IOException {* > > > > * if (value instanceof Put) this.table.put(new > Put((Put)value));* > > > > *// else if (value instanceof Delete) this.table.delete(new > > > > Delete((Delete)value));* > > > > * else throw new IOException("Pass a Delete or a Put");* > > > > * }* > > > > } > > > > > > > > /** > > > > * Creates a new record writer. > > > > * > > > > * @param context The current task context. > > > > * @return The newly created writer instance. > > > > * @throws IOException When creating the writer fails. > > > > * @throws InterruptedException When the jobs is cancelled. > > > > * @see > > > > > > > > > > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > */ > > > > @Override > > > > public RecordWriter<KEY, *Put*> getRecordWriter( > > > > TaskAttemptContext context) > > > > throws IOException, InterruptedException { > > > > return new TableRecordWriter<KEY>(this.table); > > > > } > > > > > > > > /** > > > > * Checks if the output target exists. > > > > * > > > > * @param context The current context. > > > > * @throws IOException When the check fails. > > > > * @throws InterruptedException When the job is aborted. > > > > * @see > > > > > > > > > > org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) > > > > */ > > > > @Override > > > > public void checkOutputSpecs(JobContext context) throws > IOException, > > > > InterruptedException { > > > > // TODO Check if the table exists? > > > > > > > > } > > > > > > > > /** > > > > * Returns the output committer. > > > > * > > > > * @param context The current context. > > > > * @return The committer. > > > > * @throws IOException When creating the committer fails. > > > > * @throws InterruptedException When the job is aborted. > > > > * @see > > > > > > > > > > org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > */ > > > > @Override > > > > public OutputCommitter getOutputCommitter(TaskAttemptContext > context) > > > > throws IOException, InterruptedException { > > > > return new TableOutputCommitter(); > > > > } > > > > > > > > public Configuration getConf() { > > > > return conf; > > > > } > > > > > > > > @Override > > > > public void setConf(Configuration otherConf) { > > > > this.conf = HBaseConfiguration.create(otherConf); > > > > > > > > String tableName = this.conf.get(OUTPUT_TABLE); > > > > if(tableName == null || tableName.length() <= 0) { > > > > throw new IllegalArgumentException("Must specify table name"); > > > > } > > > > > > > > String address = this.conf.get(QUORUM_ADDRESS); > > > > int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); > > > > String serverClass = this.conf.get(REGION_SERVER_CLASS); > > > > String serverImpl = this.conf.get(REGION_SERVER_IMPL); > > > > > > > > try { > > > > if (address != null) { > > > > ZKUtil.applyClusterKeyToConf(this.conf, address); > > > > } > > > > if (serverClass != null) { > > > > this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); > > > > } > > > > if (zkClientPort != 0) { > > > > this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, > > zkClientPort); > > > > } > > > > this.table = new HTable(this.conf, tableName); > > > > this.table.setAutoFlush(false, true); > > > > * String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), > > > > this.table.getName()).toString();* > > > > * this.conf.set("mapred.output.dir", outDir);* > > > > * otherConf.set("mapred.output.dir", outDir);* > > > > LOG.info("Created table instance for " + tableName); > > > > } catch(IOException e) { > > > > LOG.error(e); > > > > throw new RuntimeException(e); > > > > } > > > > } > > > > } > > > > > > > > > > > > > > |
In reply to this post by Flavio Pompermaier
Just base your changes on the current master.
On Wed, Apr 1, 2015 at 2:12 PM, Flavio Pompermaier <[hidden email]> wrote: > Ok..I'd like to have this fix in the next release. Should I branch Flink > 0.8.1 or 0.9 or which version? > > On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <[hidden email]> wrote: > > > Hi Flavio, > > > > Thanks for looking into this problem. Actually, it's a bit difficult to > > discuss your changes here because of the formatting/syntax highlighting > and > > missing context of the classes. Usually, we do that in a pull request. Do > > you have a GitHub account? If so, push your changes to your forked Flink > > repository. GitHub will then offer you to create a pull request for your > > modified branch. > > > > Let's discuss your changes on GitHub. > > > > Best, > > Max > > > > On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier <[hidden email] > > > > wrote: > > > > > Any feedback about this? > > > > > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier < > > [hidden email]> > > > wrote: > > > > > > > Hi Flink devs, > > > > this is my final report about the HBaseOutputFormat problem (with > Flink > > > > 0.8.1) and I hope you could suggest me the best way to make a PR: > > > > > > > > 1) The following code produce the error reported below (this should > be > > > > fixed in 0.9 right?) > > > > Job job = Job.getInstance(); > > > > myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new > > > > *TableOutputFormat*<Text>(), job)); > > > > > > > > org.apache.flink.api.common.functions.InvalidTypesException: > Interfaces > > > > and abstract classes are not valid types: class > > > > org.apache.hadoop.hbase.client.Mutation > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376) > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296) > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224) > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152) > > > > at > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79) > > > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160) > > > > > > > > 2) So I created a custom HBaseTableOutputFormat -*see at the end of > > the > > > > mail-* (that is basically copied from to the HBase TableInputFormat) > > that > > > > sets correctly the "mapred.output.dir" param required by the > > > > HadoopOutputFormatBase so I can make it work: > > > > Job job = Job.getInstance(); > > > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > > > > outputTableName); > > > > HBaseTableOutputFormat<Text> hbaseTOF = new > HBaseTableOutputFormat<>(); > > > > HadoopOutputFormat<Text, Put> outOF = new > > > > HadoopOutputFormat<>(hbaseTOF, job); > > > > myDataset.output(outOF); > > > > > > > > 3) However this does still not work unless you call setConf() of > > > > Configurable subclasses in the HadoopOutputFormatBase: > > > > > > > > - in the* public void finalizeGlobal(int parallelism) throws > > IOException* > > > > method: > > > > .... > > > > * if(this.mapreduceOutputFormat instanceof > > Configurable){* > > > > * > > > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > > * }* > > > > this.fileOutputCommitter = new FileOutputCommitter(new > > > > Path(this.configuration.get("mapred.output.dir")), taskContext); > > > > .... > > > > - In the* public void open(int taskNumber, int numTasks) throws > > > > IOException* method: > > > > .... > > > > > > > > * if(this.mapreduceOutputFormat instanceof > > Configurable){* > > > > * > > > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > > * }* > > > > try { > > > > this.context = > > > > HadoopUtils.instantiateTaskAttemptContext(this.configuration, > > > > taskAttemptID); > > > > } catch (Exception e) { > > > > throw new RuntimeException(e); > > > > } > > > > .... > > > > > > > > 4) Probably the modifications apported in point 3 should be applied > > both > > > > for mapreduce and mapred packages.. > > > > > > > > Thanks in advace, > > > > Flavio > > > > > > > > > > > > > > > > > ----------------------------------------------------------------------- > > > > this is the HadoopOutputFormatBase.java: > > > > > ----------------------------------------------------------------------- > > > > import java.io.IOException; > > > > > > > > import org.apache.commons.logging.Log; > > > > import org.apache.commons.logging.LogFactory; > > > > import org.apache.hadoop.classification.InterfaceAudience; > > > > import org.apache.hadoop.classification.InterfaceStability; > > > > import org.apache.hadoop.conf.Configurable; > > > > import org.apache.hadoop.conf.Configuration; > > > > import org.apache.hadoop.hbase.HBaseConfiguration; > > > > import org.apache.hadoop.hbase.HConstants; > > > > import org.apache.hadoop.hbase.client.Delete; > > > > import org.apache.hadoop.hbase.client.HTable; > > > > import org.apache.hadoop.hbase.client.Put; > > > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; > > > > import org.apache.hadoop.hbase.util.FSUtils; > > > > import org.apache.hadoop.hbase.zookeeper.ZKUtil; > > > > import org.apache.hadoop.mapreduce.JobContext; > > > > import org.apache.hadoop.mapreduce.OutputCommitter; > > > > import org.apache.hadoop.mapreduce.OutputFormat; > > > > import org.apache.hadoop.mapreduce.RecordWriter; > > > > import org.apache.hadoop.mapreduce.TaskAttemptContext; > > > > > > > > /** > > > > * Convert Map/Reduce output and write it to an HBase table. The KEY > is > > > > ignored > > > > * while the output value <u>must</u> be either a {@link Put} or a > > > > * {@link Delete} instance. > > > > * > > > > * @param <KEY> The type of the key. Ignored in this class. > > > > */ > > > > @InterfaceAudience.Public > > > > @InterfaceStability.Stable > > > > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, > > Put>* > > > > implements Configurable { > > > > > > > > private final Log LOG = > > > LogFactory.getLog(HBaseTableOutputFormat.class); > > > > > > > > /** Job parameter that specifies the output table. */ > > > > public static final String OUTPUT_TABLE = > "hbase.mapred.outputtable"; > > > > > > > > /** > > > > * Optional job parameter to specify a peer cluster. > > > > * Used specifying remote cluster when copying between hbase > clusters > > > > (the > > > > * source is picked up from <code>hbase-site.xml</code>). > > > > * @see TableMapReduceUtil#initTableReducerJob(String, Class, > > > > org.apache.hadoop.mapreduce.Job, Class, String, String, String) > > > > */ > > > > public static final String QUORUM_ADDRESS = > > > "hbase.mapred.output.quorum"; > > > > > > > > /** Optional job parameter to specify peer cluster's ZK client port > > */ > > > > public static final String QUORUM_PORT = > > > > "hbase.mapred.output.quorum.port"; > > > > > > > > /** Optional specification of the rs class name of the peer cluster > > */ > > > > public static final String > > > > REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; > > > > /** Optional specification of the rs impl name of the peer cluster > */ > > > > public static final String > > > > REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; > > > > > > > > /** The configuration. */ > > > > private Configuration conf = null; > > > > > > > > private HTable table; > > > > > > > > /** > > > > * Writes the reducer output to an HBase table. > > > > * > > > > * @param <KEY> The type of the key. > > > > */ > > > > protected static class TableRecordWriter<KEY> > > > > *extends RecordWriter<KEY, Put> *{ > > > > > > > > /** The table to write to. */ > > > > private HTable table; > > > > > > > > /** > > > > * Instantiate a TableRecordWriter with the HBase HClient for > > > writing. > > > > * > > > > * @param table The table to write to. > > > > */ > > > > public TableRecordWriter(HTable table) { > > > > this.table = table; > > > > } > > > > > > > > /** > > > > * Closes the writer, in this case flush table commits. > > > > * > > > > * @param context The context. > > > > * @throws IOException When closing the writer fails. > > > > * @see > > > > > > > > > > org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > */ > > > > @Override > > > > public void close(TaskAttemptContext context) > > > > throws IOException { > > > > table.close(); > > > > } > > > > > > > > /** > > > > * Writes a key/value pair into the table. > > > > * > > > > * @param key The key. > > > > * @param value The value. > > > > * @throws IOException When writing fails. > > > > * @see > > > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, > > > > java.lang.Object) > > > > */ > > > > @Override > > > > *public void write(KEY key, Put value)* > > > > * throws IOException {* > > > > * if (value instanceof Put) this.table.put(new > Put((Put)value));* > > > > *// else if (value instanceof Delete) this.table.delete(new > > > > Delete((Delete)value));* > > > > * else throw new IOException("Pass a Delete or a Put");* > > > > * }* > > > > } > > > > > > > > /** > > > > * Creates a new record writer. > > > > * > > > > * @param context The current task context. > > > > * @return The newly created writer instance. > > > > * @throws IOException When creating the writer fails. > > > > * @throws InterruptedException When the jobs is cancelled. > > > > * @see > > > > > > > > > > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > */ > > > > @Override > > > > public RecordWriter<KEY, *Put*> getRecordWriter( > > > > TaskAttemptContext context) > > > > throws IOException, InterruptedException { > > > > return new TableRecordWriter<KEY>(this.table); > > > > } > > > > > > > > /** > > > > * Checks if the output target exists. > > > > * > > > > * @param context The current context. > > > > * @throws IOException When the check fails. > > > > * @throws InterruptedException When the job is aborted. > > > > * @see > > > > > > > > > > org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) > > > > */ > > > > @Override > > > > public void checkOutputSpecs(JobContext context) throws > IOException, > > > > InterruptedException { > > > > // TODO Check if the table exists? > > > > > > > > } > > > > > > > > /** > > > > * Returns the output committer. > > > > * > > > > * @param context The current context. > > > > * @return The committer. > > > > * @throws IOException When creating the committer fails. > > > > * @throws InterruptedException When the job is aborted. > > > > * @see > > > > > > > > > > org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > */ > > > > @Override > > > > public OutputCommitter getOutputCommitter(TaskAttemptContext > context) > > > > throws IOException, InterruptedException { > > > > return new TableOutputCommitter(); > > > > } > > > > > > > > public Configuration getConf() { > > > > return conf; > > > > } > > > > > > > > @Override > > > > public void setConf(Configuration otherConf) { > > > > this.conf = HBaseConfiguration.create(otherConf); > > > > > > > > String tableName = this.conf.get(OUTPUT_TABLE); > > > > if(tableName == null || tableName.length() <= 0) { > > > > throw new IllegalArgumentException("Must specify table name"); > > > > } > > > > > > > > String address = this.conf.get(QUORUM_ADDRESS); > > > > int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); > > > > String serverClass = this.conf.get(REGION_SERVER_CLASS); > > > > String serverImpl = this.conf.get(REGION_SERVER_IMPL); > > > > > > > > try { > > > > if (address != null) { > > > > ZKUtil.applyClusterKeyToConf(this.conf, address); > > > > } > > > > if (serverClass != null) { > > > > this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); > > > > } > > > > if (zkClientPort != 0) { > > > > this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, > > zkClientPort); > > > > } > > > > this.table = new HTable(this.conf, tableName); > > > > this.table.setAutoFlush(false, true); > > > > * String outDir = FSUtils.getTableDir(FSUtils.getRootDir(conf), > > > > this.table.getName()).toString();* > > > > * this.conf.set("mapred.output.dir", outDir);* > > > > * otherConf.set("mapred.output.dir", outDir);* > > > > LOG.info("Created table instance for " + tableName); > > > > } catch(IOException e) { > > > > LOG.error(e); > > > > throw new RuntimeException(e); > > > > } > > > > } > > > > } > > > > > > > > > > > > > > |
In reply to this post by Fabian Hueske-2
Now I made my fork (https://github.com/fpompermaier/flink) but when I run
the application I get this error: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Put at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at java.util.ArrayList.writeObject(ArrayList.java:742) at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:286) at org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:74) I started from the wordcount example and my code is: Job job = Job.getInstance(); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName); job.getConfiguration().set("mapred.output.dir","/tmp/test"); counts.map(new MapFunction<Tuple2<String,Integer>, Tuple2<Text,Mutation>>() { private final byte[] CF_SOME = Bytes.toBytes("test-column"); private final byte[] Q_SOME = Bytes.toBytes("value"); private Tuple2<Text, Mutation> reuse = new Tuple2<Text, Mutation>(); @Override public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception { reuse.f0 = new Text(t.f0); Put put = new Put(t.f0.getBytes()); put.add(CF_SOME, Q_SOME, Bytes.toBytes(t.f1)); reuse.f1 = put; return reuse; } }).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job)); Do I have to register how to serialize Put somewhere? On Wed, Apr 1, 2015 at 2:32 PM, Fabian Hueske <[hidden email]> wrote: > What ever works best for you. > We can easily backport or forwardport the patch. > > 2015-04-01 14:12 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > Ok..I'd like to have this fix in the next release. Should I branch Flink > > 0.8.1 or 0.9 or which version? > > > > On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <[hidden email]> > wrote: > > > > > Hi Flavio, > > > > > > Thanks for looking into this problem. Actually, it's a bit difficult to > > > discuss your changes here because of the formatting/syntax highlighting > > and > > > missing context of the classes. Usually, we do that in a pull request. > Do > > > you have a GitHub account? If so, push your changes to your forked > Flink > > > repository. GitHub will then offer you to create a pull request for > your > > > modified branch. > > > > > > Let's discuss your changes on GitHub. > > > > > > Best, > > > Max > > > > > > On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier < > [hidden email] > > > > > > wrote: > > > > > > > Any feedback about this? > > > > > > > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier < > > > [hidden email]> > > > > wrote: > > > > > > > > > Hi Flink devs, > > > > > this is my final report about the HBaseOutputFormat problem (with > > Flink > > > > > 0.8.1) and I hope you could suggest me the best way to make a PR: > > > > > > > > > > 1) The following code produce the error reported below (this should > > be > > > > > fixed in 0.9 right?) > > > > > Job job = Job.getInstance(); > > > > > myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new > > > > > *TableOutputFormat*<Text>(), job)); > > > > > > > > > > org.apache.flink.api.common.functions.InvalidTypesException: > > Interfaces > > > > > and abstract classes are not valid types: class > > > > > org.apache.hadoop.hbase.client.Mutation > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376) > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296) > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224) > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152) > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79) > > > > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160) > > > > > > > > > > 2) So I created a custom HBaseTableOutputFormat -*see at the end > of > > > the > > > > > mail-* (that is basically copied from to the HBase > TableInputFormat) > > > that > > > > > sets correctly the "mapred.output.dir" param required by the > > > > > HadoopOutputFormatBase so I can make it work: > > > > > Job job = Job.getInstance(); > > > > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > > > > > outputTableName); > > > > > HBaseTableOutputFormat<Text> hbaseTOF = new > > HBaseTableOutputFormat<>(); > > > > > HadoopOutputFormat<Text, Put> outOF = new > > > > > HadoopOutputFormat<>(hbaseTOF, job); > > > > > myDataset.output(outOF); > > > > > > > > > > 3) However this does still not work unless you call setConf() of > > > > > Configurable subclasses in the HadoopOutputFormatBase: > > > > > > > > > > - in the* public void finalizeGlobal(int parallelism) throws > > > IOException* > > > > > method: > > > > > .... > > > > > * if(this.mapreduceOutputFormat instanceof > > > Configurable){* > > > > > * > > > > > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > > > * }* > > > > > this.fileOutputCommitter = new FileOutputCommitter(new > > > > > Path(this.configuration.get("mapred.output.dir")), taskContext); > > > > > .... > > > > > - In the* public void open(int taskNumber, int numTasks) throws > > > > > IOException* method: > > > > > .... > > > > > > > > > > * if(this.mapreduceOutputFormat instanceof > > > Configurable){* > > > > > * > > > > > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > > > * }* > > > > > try { > > > > > this.context = > > > > > HadoopUtils.instantiateTaskAttemptContext(this.configuration, > > > > > taskAttemptID); > > > > > } catch (Exception e) { > > > > > throw new RuntimeException(e); > > > > > } > > > > > .... > > > > > > > > > > 4) Probably the modifications apported in point 3 should be applied > > > both > > > > > for mapreduce and mapred packages.. > > > > > > > > > > Thanks in advace, > > > > > Flavio > > > > > > > > > > > > > > > > > > > > > > ----------------------------------------------------------------------- > > > > > this is the HadoopOutputFormatBase.java: > > > > > > > ----------------------------------------------------------------------- > > > > > import java.io.IOException; > > > > > > > > > > import org.apache.commons.logging.Log; > > > > > import org.apache.commons.logging.LogFactory; > > > > > import org.apache.hadoop.classification.InterfaceAudience; > > > > > import org.apache.hadoop.classification.InterfaceStability; > > > > > import org.apache.hadoop.conf.Configurable; > > > > > import org.apache.hadoop.conf.Configuration; > > > > > import org.apache.hadoop.hbase.HBaseConfiguration; > > > > > import org.apache.hadoop.hbase.HConstants; > > > > > import org.apache.hadoop.hbase.client.Delete; > > > > > import org.apache.hadoop.hbase.client.HTable; > > > > > import org.apache.hadoop.hbase.client.Put; > > > > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; > > > > > import org.apache.hadoop.hbase.util.FSUtils; > > > > > import org.apache.hadoop.hbase.zookeeper.ZKUtil; > > > > > import org.apache.hadoop.mapreduce.JobContext; > > > > > import org.apache.hadoop.mapreduce.OutputCommitter; > > > > > import org.apache.hadoop.mapreduce.OutputFormat; > > > > > import org.apache.hadoop.mapreduce.RecordWriter; > > > > > import org.apache.hadoop.mapreduce.TaskAttemptContext; > > > > > > > > > > /** > > > > > * Convert Map/Reduce output and write it to an HBase table. The > KEY > > is > > > > > ignored > > > > > * while the output value <u>must</u> be either a {@link Put} or a > > > > > * {@link Delete} instance. > > > > > * > > > > > * @param <KEY> The type of the key. Ignored in this class. > > > > > */ > > > > > @InterfaceAudience.Public > > > > > @InterfaceStability.Stable > > > > > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, > > > Put>* > > > > > implements Configurable { > > > > > > > > > > private final Log LOG = > > > > LogFactory.getLog(HBaseTableOutputFormat.class); > > > > > > > > > > /** Job parameter that specifies the output table. */ > > > > > public static final String OUTPUT_TABLE = > > "hbase.mapred.outputtable"; > > > > > > > > > > /** > > > > > * Optional job parameter to specify a peer cluster. > > > > > * Used specifying remote cluster when copying between hbase > > clusters > > > > > (the > > > > > * source is picked up from <code>hbase-site.xml</code>). > > > > > * @see TableMapReduceUtil#initTableReducerJob(String, Class, > > > > > org.apache.hadoop.mapreduce.Job, Class, String, String, String) > > > > > */ > > > > > public static final String QUORUM_ADDRESS = > > > > "hbase.mapred.output.quorum"; > > > > > > > > > > /** Optional job parameter to specify peer cluster's ZK client > port > > > */ > > > > > public static final String QUORUM_PORT = > > > > > "hbase.mapred.output.quorum.port"; > > > > > > > > > > /** Optional specification of the rs class name of the peer > cluster > > > */ > > > > > public static final String > > > > > REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; > > > > > /** Optional specification of the rs impl name of the peer > cluster > > */ > > > > > public static final String > > > > > REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; > > > > > > > > > > /** The configuration. */ > > > > > private Configuration conf = null; > > > > > > > > > > private HTable table; > > > > > > > > > > /** > > > > > * Writes the reducer output to an HBase table. > > > > > * > > > > > * @param <KEY> The type of the key. > > > > > */ > > > > > protected static class TableRecordWriter<KEY> > > > > > *extends RecordWriter<KEY, Put> *{ > > > > > > > > > > /** The table to write to. */ > > > > > private HTable table; > > > > > > > > > > /** > > > > > * Instantiate a TableRecordWriter with the HBase HClient for > > > > writing. > > > > > * > > > > > * @param table The table to write to. > > > > > */ > > > > > public TableRecordWriter(HTable table) { > > > > > this.table = table; > > > > > } > > > > > > > > > > /** > > > > > * Closes the writer, in this case flush table commits. > > > > > * > > > > > * @param context The context. > > > > > * @throws IOException When closing the writer fails. > > > > > * @see > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > > */ > > > > > @Override > > > > > public void close(TaskAttemptContext context) > > > > > throws IOException { > > > > > table.close(); > > > > > } > > > > > > > > > > /** > > > > > * Writes a key/value pair into the table. > > > > > * > > > > > * @param key The key. > > > > > * @param value The value. > > > > > * @throws IOException When writing fails. > > > > > * @see > > > > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, > > > > > java.lang.Object) > > > > > */ > > > > > @Override > > > > > *public void write(KEY key, Put value)* > > > > > * throws IOException {* > > > > > * if (value instanceof Put) this.table.put(new > > Put((Put)value));* > > > > > *// else if (value instanceof Delete) this.table.delete(new > > > > > Delete((Delete)value));* > > > > > * else throw new IOException("Pass a Delete or a Put");* > > > > > * }* > > > > > } > > > > > > > > > > /** > > > > > * Creates a new record writer. > > > > > * > > > > > * @param context The current task context. > > > > > * @return The newly created writer instance. > > > > > * @throws IOException When creating the writer fails. > > > > > * @throws InterruptedException When the jobs is cancelled. > > > > > * @see > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > > */ > > > > > @Override > > > > > public RecordWriter<KEY, *Put*> getRecordWriter( > > > > > TaskAttemptContext context) > > > > > throws IOException, InterruptedException { > > > > > return new TableRecordWriter<KEY>(this.table); > > > > > } > > > > > > > > > > /** > > > > > * Checks if the output target exists. > > > > > * > > > > > * @param context The current context. > > > > > * @throws IOException When the check fails. > > > > > * @throws InterruptedException When the job is aborted. > > > > > * @see > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) > > > > > */ > > > > > @Override > > > > > public void checkOutputSpecs(JobContext context) throws > > IOException, > > > > > InterruptedException { > > > > > // TODO Check if the table exists? > > > > > > > > > > } > > > > > > > > > > /** > > > > > * Returns the output committer. > > > > > * > > > > > * @param context The current context. > > > > > * @return The committer. > > > > > * @throws IOException When creating the committer fails. > > > > > * @throws InterruptedException When the job is aborted. > > > > > * @see > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > > */ > > > > > @Override > > > > > public OutputCommitter getOutputCommitter(TaskAttemptContext > > context) > > > > > throws IOException, InterruptedException { > > > > > return new TableOutputCommitter(); > > > > > } > > > > > > > > > > public Configuration getConf() { > > > > > return conf; > > > > > } > > > > > > > > > > @Override > > > > > public void setConf(Configuration otherConf) { > > > > > this.conf = HBaseConfiguration.create(otherConf); > > > > > > > > > > String tableName = this.conf.get(OUTPUT_TABLE); > > > > > if(tableName == null || tableName.length() <= 0) { > > > > > throw new IllegalArgumentException("Must specify table > name"); > > > > > } > > > > > > > > > > String address = this.conf.get(QUORUM_ADDRESS); > > > > > int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); > > > > > String serverClass = this.conf.get(REGION_SERVER_CLASS); > > > > > String serverImpl = this.conf.get(REGION_SERVER_IMPL); > > > > > > > > > > try { > > > > > if (address != null) { > > > > > ZKUtil.applyClusterKeyToConf(this.conf, address); > > > > > } > > > > > if (serverClass != null) { > > > > > this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); > > > > > } > > > > > if (zkClientPort != 0) { > > > > > this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, > > > zkClientPort); > > > > > } > > > > > this.table = new HTable(this.conf, tableName); > > > > > this.table.setAutoFlush(false, true); > > > > > * String outDir = > FSUtils.getTableDir(FSUtils.getRootDir(conf), > > > > > this.table.getName()).toString();* > > > > > * this.conf.set("mapred.output.dir", outDir);* > > > > > * otherConf.set("mapred.output.dir", outDir);* > > > > > LOG.info("Created table instance for " + tableName); > > > > > } catch(IOException e) { > > > > > LOG.error(e); > > > > > throw new RuntimeException(e); > > > > > } > > > > > } > > > > > } > > > > > > > > > > > > > > > > > > > > |
If Put is not Serializable it cannot be serialized and shipped.
Is it possible to make that field transient and initialize Put in configure()? From: Flavio Pompermaier Sent: Friday, 3. April, 2015 01:42 To: [hidden email] Now I made my fork (https://github.com/fpompermaier/flink) but when I run the application I get this error: java.io.NotSerializableException: org.apache.hadoop.hbase.client.Put at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at java.util.ArrayList.writeObject(ArrayList.java:742) at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:286) at org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:74) I started from the wordcount example and my code is: Job job = Job.getInstance(); job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, outputTableName); job.getConfiguration().set("mapred.output.dir","/tmp/test"); counts.map(new MapFunction<Tuple2<String,Integer>, Tuple2<Text,Mutation>>() { private final byte[] CF_SOME = Bytes.toBytes("test-column"); private final byte[] Q_SOME = Bytes.toBytes("value"); private Tuple2<Text, Mutation> reuse = new Tuple2<Text, Mutation>(); @Override public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws Exception { reuse.f0 = new Text(t.f0); Put put = new Put(t.f0.getBytes()); put.add(CF_SOME, Q_SOME, Bytes.toBytes(t.f1)); reuse.f1 = put; return reuse; } }).output(new HadoopOutputFormat<Text, Mutation>(new TableOutputFormat<Text>(), job)); Do I have to register how to serialize Put somewhere? On Wed, Apr 1, 2015 at 2:32 PM, Fabian Hueske <[hidden email]> wrote: > What ever works best for you. > We can easily backport or forwardport the patch. > > 2015-04-01 14:12 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > Ok..I'd like to have this fix in the next release. Should I branch Flink > > 0.8.1 or 0.9 or which version? > > > > On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <[hidden email]> > wrote: > > > > > Hi Flavio, > > > > > > Thanks for looking into this problem. Actually, it's a bit difficult to > > > discuss your changes here because of the formatting/syntax highlighting > > and > > > missing context of the classes. Usually, we do that in a pull request. > Do > > > you have a GitHub account? If so, push your changes to your forked > Flink > > > repository. GitHub will then offer you to create a pull request for > your > > > modified branch. > > > > > > Let's discuss your changes on GitHub. > > > > > > Best, > > > Max > > > > > > On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier < > [hidden email] > > > > > > wrote: > > > > > > > Any feedback about this? > > > > > > > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier < > > > [hidden email]> > > > > wrote: > > > > > > > > > Hi Flink devs, > > > > > this is my final report about the HBaseOutputFormat problem (with > > Flink > > > > > 0.8.1) and I hope you could suggest me the best way to make a PR: > > > > > > > > > > 1) The following code produce the error reported below (this should > > be > > > > > fixed in 0.9 right?) > > > > > Job job = Job.getInstance(); > > > > > myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new > > > > > *TableOutputFormat*<Text>(), job)); > > > > > > > > > > org.apache.flink.api.common.functions.InvalidTypesException: > > Interfaces > > > > > and abstract classes are not valid types: class > > > > > org.apache.hadoop.hbase.client.Mutation > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376) > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296) > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224) > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152) > > > > > at > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79) > > > > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160) > > > > > > > > > > 2) So I created a custom HBaseTableOutputFormat -*see at the end > of > > > the > > > > > mail-* (that is basically copied from to the HBase > TableInputFormat) > > > that > > > > > sets correctly the "mapred.output.dir" param required by the > > > > > HadoopOutputFormatBase so I can make it work: > > > > > Job job = Job.getInstance(); > > > > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > > > > > outputTableName); > > > > > HBaseTableOutputFormat<Text> hbaseTOF = new > > HBaseTableOutputFormat<>(); > > > > > HadoopOutputFormat<Text, Put> outOF = new > > > > > HadoopOutputFormat<>(hbaseTOF, job); > > > > > myDataset.output(outOF); > > > > > > > > > > 3) However this does still not work unless you call setConf() of > > > > > Configurable subclasses in the HadoopOutputFormatBase: > > > > > > > > > > - in the* public void finalizeGlobal(int parallelism) throws > > > IOException* > > > > > method: > > > > > .... > > > > > * if(this.mapreduceOutputFormat instanceof > > > Configurable){* > > > > > * > > > > > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > > > * }* > > > > > this.fileOutputCommitter = new FileOutputCommitter(new > > > > > Path(this.configuration.get("mapred.output.dir")), taskContext); > > > > > .... > > > > > - In the* public void open(int taskNumber, int numTasks) throws > > > > > IOException* method: > > > > > .... > > > > > > > > > > * if(this.mapreduceOutputFormat instanceof > > > Configurable){* > > > > > * > > > > > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > > > * }* > > > > > try { > > > > > this.context = > > > > > HadoopUtils.instantiateTaskAttemptContext(this.configuration, > > > > > taskAttemptID); > > > > > } catch (Exception e) { > > > > > throw new RuntimeException(e); > > > > > } > > > > > .... > > > > > > > > > > 4) Probably the modifications apported in point 3 should be applied > > > both > > > > > for mapreduce and mapred packages.. > > > > > > > > > > Thanks in advace, > > > > > Flavio > > > > > > > > > > > > > > > > > > > > > > ----------------------------------------------------------------------- > > > > > this is the HadoopOutputFormatBase.java: > > > > > > > ----------------------------------------------------------------------- > > > > > import java.io.IOException; > > > > > > > > > > import org.apache.commons.logging.Log; > > > > > import org.apache.commons.logging.LogFactory; > > > > > import org.apache.hadoop.classification.InterfaceAudience; > > > > > import org.apache.hadoop.classification.InterfaceStability; > > > > > import org.apache.hadoop.conf.Configurable; > > > > > import org.apache.hadoop.conf.Configuration; > > > > > import org.apache.hadoop.hbase.HBaseConfiguration; > > > > > import org.apache.hadoop.hbase.HConstants; > > > > > import org.apache.hadoop.hbase.client.Delete; > > > > > import org.apache.hadoop.hbase.client.HTable; > > > > > import org.apache.hadoop.hbase.client.Put; > > > > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; > > > > > import org.apache.hadoop.hbase.util.FSUtils; > > > > > import org.apache.hadoop.hbase.zookeeper.ZKUtil; > > > > > import org.apache.hadoop.mapreduce.JobContext; > > > > > import org.apache.hadoop.mapreduce.OutputCommitter; > > > > > import org.apache.hadoop.mapreduce.OutputFormat; > > > > > import org.apache.hadoop.mapreduce.RecordWriter; > > > > > import org.apache.hadoop.mapreduce.TaskAttemptContext; > > > > > > > > > > /** > > > > > * Convert Map/Reduce output and write it to an HBase table. The > KEY > > is > > > > > ignored > > > > > * while the output value <u>must</u> be either a {@link Put} or a > > > > > * {@link Delete} instance. > > > > > * > > > > > * @param <KEY> The type of the key. Ignored in this class. > > > > > */ > > > > > @InterfaceAudience.Public > > > > > @InterfaceStability.Stable > > > > > public class HBaseTableOutputFormat<KEY>* extends OutputFormat<KEY, > > > Put>* > > > > > implements Configurable { > > > > > > > > > > private final Log LOG = > > > > LogFactory.getLog(HBaseTableOutputFormat.class); > > > > > > > > > > /** Job parameter that specifies the output table. */ > > > > > public static final String OUTPUT_TABLE = > > "hbase.mapred.outputtable"; > > > > > > > > > > /** > > > > > * Optional job parameter to specify a peer cluster. > > > > > * Used specifying remote cluster when copying between hbase > > clusters > > > > > (the > > > > > * source is picked up from <code>hbase-site.xml</code>). > > > > > * @see TableMapReduceUtil#initTableReducerJob(String, Class, > > > > > org.apache.hadoop.mapreduce.Job, Class, String, String, String) > > > > > */ > > > > > public static final String QUORUM_ADDRESS = > > > > "hbase.mapred.output.quorum"; > > > > > > > > > > /** Optional job parameter to specify peer cluster's ZK client > port > > > */ > > > > > public static final String QUORUM_PORT = > > > > > "hbase.mapred.output.quorum.port"; > > > > > > > > > > /** Optional specification of the rs class name of the peer > cluster > > > */ > > > > > public static final String > > > > > REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; > > > > > /** Optional specification of the rs impl name of the peer > cluster > > */ > > > > > public static final String > > > > > REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; > > > > > > > > > > /** The configuration. */ > > > > > private Configuration conf = null; > > > > > > > > > > private HTable table; > > > > > > > > > > /** > > > > > * Writes the reducer output to an HBase table. > > > > > * > > > > > * @param <KEY> The type of the key. > > > > > */ > > > > > protected static class TableRecordWriter<KEY> > > > > > *extends RecordWriter<KEY, Put> *{ > > > > > > > > > > /** The table to write to. */ > > > > > private HTable table; > > > > > > > > > > /** > > > > > * Instantiate a TableRecordWriter with the HBase HClient for > > > > writing. > > > > > * > > > > > * @param table The table to write to. > > > > > */ > > > > > public TableRecordWriter(HTable table) { > > > > > this.table = table; > > > > > } > > > > > > > > > > /** > > > > > * Closes the writer, in this case flush table commits. > > > > > * > > > > > * @param context The context. > > > > > * @throws IOException When closing the writer fails. > > > > > * @see > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > > */ > > > > > @Override > > > > > public void close(TaskAttemptContext context) > > > > > throws IOException { > > > > > table.close(); > > > > > } > > > > > > > > > > /** > > > > > * Writes a key/value pair into the table. > > > > > * > > > > > * @param key The key. > > > > > * @param value The value. > > > > > * @throws IOException When writing fails. > > > > > * @see > > > > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, > > > > > java.lang.Object) > > > > > */ > > > > > @Override > > > > > *public void write(KEY key, Put value)* > > > > > * throws IOException {* > > > > > * if (value instanceof Put) this.table.put(new > > Put((Put)value));* > > > > > *// else if (value instanceof Delete) this.table.delete(new > > > > > Delete((Delete)value));* > > > > > * else throw new IOException("Pass a Delete or a Put");* > > > > > * }* > > > > > } > > > > > > > > > > /** > > > > > * Creates a new record writer. > > > > > * > > > > > * @param context The current task context. > > > > > * @return The newly created writer instance. > > > > > * @throws IOException When creating the writer fails. > > > > > * @throws InterruptedException When the jobs is cancelled. > > > > > * @see > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > > */ > > > > > @Override > > > > > public RecordWriter<KEY, *Put*> getRecordWriter( > > > > > TaskAttemptContext context) > > > > > throws IOException, InterruptedException { > > > > > return new TableRecordWriter<KEY>(this.table); > > > > > } > > > > > > > > > > /** > > > > > * Checks if the output target exists. > > > > > * > > > > > * @param context The current context. > > > > > * @throws IOException When the check fails. > > > > > * @throws InterruptedException When the job is aborted. > > > > > * @see > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) > > > > > */ > > > > > @Override > > > > > public void checkOutputSpecs(JobContext context) throws > > IOException, > > > > > InterruptedException { > > > > > // TODO Check if the table exists? > > > > > > > > > > } > > > > > > > > > > /** > > > > > * Returns the output committer. > > > > > * > > > > > * @param context The current context. > > > > > * @return The committer. > > > > > * @throws IOException When creating the committer fails. > > > > > * @throws InterruptedException When the job is aborted. > > > > > * @see > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > > */ > > > > > @Override > > > > > public OutputCommitter getOutputCommitter(TaskAttemptContext > > context) > > > > > throws IOException, InterruptedException { > > > > > return new TableOutputCommitter(); > > > > > } > > > > > > > > > > public Configuration getConf() { > > > > > return conf; > > > > > } > > > > > > > > > > @Override > > > > > public void setConf(Configuration otherConf) { > > > > > this.conf = HBaseConfiguration.create(otherConf); > > > > > > > > > > String tableName = this.conf.get(OUTPUT_TABLE); > > > > > if(tableName == null || tableName.length() <= 0) { > > > > > throw new IllegalArgumentException("Must specify table > name"); > > > > > } > > > > > > > > > > String address = this.conf.get(QUORUM_ADDRESS); > > > > > int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); > > > > > String serverClass = this.conf.get(REGION_SERVER_CLASS); > > > > > String serverImpl = this.conf.get(REGION_SERVER_IMPL); > > > > > > > > > > try { > > > > > if (address != null) { > > > > > ZKUtil.applyClusterKeyToConf(this.conf, address); > > > > > } > > > > > if (serverClass != null) { > > > > > this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); > > > > > } > > > > > if (zkClientPort != 0) { > > > > > this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, > > > zkClientPort); > > > > > } > > > > > this.table = new HTable(this.conf, tableName); > > > > > this.table.setAutoFlush(false, true); > > > > > * String outDir = > FSUtils.getTableDir(FSUtils.getRootDir(conf), > > > > > this.table.getName()).toString();* > > > > > * this.conf.set("mapred.output.dir", outDir);* > > > > > * otherConf.set("mapred.output.dir", outDir);* > > > > > LOG.info("Created table instance for " + tableName); > > > > > } catch(IOException e) { > > > > > LOG.error(e); > > > > > throw new RuntimeException(e); > > > > > } > > > > > } > > > > > } > > > > > > > > > > > > > > > > > > > > |
Which field?the Tuple2?I use it with Flink 0.8.1 without errors
On Apr 3, 2015 2:27 AM, <[hidden email]> wrote: > If Put is not Serializable it cannot be serialized and shipped. > > Is it possible to make that field transient and initialize Put in > configure()? > > > > > > > From: Flavio Pompermaier > Sent: Friday, 3. April, 2015 01:42 > To: [hidden email] > > > > > > Now I made my fork (https://github.com/fpompermaier/flink) but when I run > the application I get this error: > > java.io.NotSerializableException: org.apache.hadoop.hbase.client.Put > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at java.util.ArrayList.writeObject(ArrayList.java:742) > at sun.reflect.GeneratedMethodAccessor2.invoke(Unknown Source) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495) > at > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at > java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) > at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) > at > > java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) > at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) > at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) > at > > org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:286) > at > > org.apache.flink.runtime.taskmanager.TaskExecutionState.<init>(TaskExecutionState.java:74) > > I started from the wordcount example and my code is: > Job job = Job.getInstance(); > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > outputTableName); > job.getConfiguration().set("mapred.output.dir","/tmp/test"); > counts.map(new MapFunction<Tuple2<String,Integer>, Tuple2<Text,Mutation>>() > { > private final byte[] CF_SOME = Bytes.toBytes("test-column"); > private final byte[] Q_SOME = Bytes.toBytes("value"); > private Tuple2<Text, Mutation> reuse = new Tuple2<Text, Mutation>(); > > @Override > public Tuple2<Text, Mutation> map(Tuple2<String, Integer> t) throws > Exception { > reuse.f0 = new Text(t.f0); > Put put = new Put(t.f0.getBytes()); > put.add(CF_SOME, Q_SOME, Bytes.toBytes(t.f1)); > reuse.f1 = put; > return reuse; > } > }).output(new HadoopOutputFormat<Text, Mutation>(new > TableOutputFormat<Text>(), job)); > > Do I have to register how to serialize Put somewhere? > > On Wed, Apr 1, 2015 at 2:32 PM, Fabian Hueske <[hidden email]> wrote: > > > What ever works best for you. > > We can easily backport or forwardport the patch. > > > > 2015-04-01 14:12 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > > > Ok..I'd like to have this fix in the next release. Should I branch > Flink > > > 0.8.1 or 0.9 or which version? > > > > > > On Wed, Apr 1, 2015 at 2:04 PM, Maximilian Michels <[hidden email]> > > wrote: > > > > > > > Hi Flavio, > > > > > > > > Thanks for looking into this problem. Actually, it's a bit difficult > to > > > > discuss your changes here because of the formatting/syntax > highlighting > > > and > > > > missing context of the classes. Usually, we do that in a pull > request. > > Do > > > > you have a GitHub account? If so, push your changes to your forked > > Flink > > > > repository. GitHub will then offer you to create a pull request for > > your > > > > modified branch. > > > > > > > > Let's discuss your changes on GitHub. > > > > > > > > Best, > > > > Max > > > > > > > > On Wed, Apr 1, 2015 at 1:44 PM, Flavio Pompermaier < > > [hidden email] > > > > > > > > wrote: > > > > > > > > > Any feedback about this? > > > > > > > > > > On Tue, Mar 31, 2015 at 7:07 PM, Flavio Pompermaier < > > > > [hidden email]> > > > > > wrote: > > > > > > > > > > > Hi Flink devs, > > > > > > this is my final report about the HBaseOutputFormat problem (with > > > Flink > > > > > > 0.8.1) and I hope you could suggest me the best way to make a PR: > > > > > > > > > > > > 1) The following code produce the error reported below (this > should > > > be > > > > > > fixed in 0.9 right?) > > > > > > Job job = Job.getInstance(); > > > > > > myDataset.output( new HadoopOutputFormat<Text, *Mutation*>(new > > > > > > *TableOutputFormat*<Text>(), job)); > > > > > > > > > > > > org.apache.flink.api.common.functions.InvalidTypesException: > > > Interfaces > > > > > > and abstract classes are not valid types: class > > > > > > org.apache.hadoop.hbase.client.Mutation > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:885) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:877) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:376) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:296) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:224) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:152) > > > > > > at > > > > > > > > > > > > > > > > > > > > > org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:79) > > > > > > at org.apache.flink.api.java.DataSet.map(DataSet.java:160) > > > > > > > > > > > > 2) So I created a custom HBaseTableOutputFormat -*see at the end > > of > > > > the > > > > > > mail-* (that is basically copied from to the HBase > > TableInputFormat) > > > > that > > > > > > sets correctly the "mapred.output.dir" param required by the > > > > > > HadoopOutputFormatBase so I can make it work: > > > > > > Job job = Job.getInstance(); > > > > > > job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, > > > > > > outputTableName); > > > > > > HBaseTableOutputFormat<Text> hbaseTOF = new > > > HBaseTableOutputFormat<>(); > > > > > > HadoopOutputFormat<Text, Put> outOF = new > > > > > > HadoopOutputFormat<>(hbaseTOF, job); > > > > > > myDataset.output(outOF); > > > > > > > > > > > > 3) However this does still not work unless you call setConf() of > > > > > > Configurable subclasses in the HadoopOutputFormatBase: > > > > > > > > > > > > - in the* public void finalizeGlobal(int parallelism) throws > > > > IOException* > > > > > > method: > > > > > > .... > > > > > > * if(this.mapreduceOutputFormat instanceof > > > > Configurable){* > > > > > > * > > > > > > > > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > > > > * }* > > > > > > this.fileOutputCommitter = new FileOutputCommitter(new > > > > > > Path(this.configuration.get("mapred.output.dir")), taskContext); > > > > > > .... > > > > > > - In the* public void open(int taskNumber, int numTasks) throws > > > > > > IOException* method: > > > > > > .... > > > > > > > > > > > > * if(this.mapreduceOutputFormat instanceof > > > > Configurable){* > > > > > > * > > > > > > > > > ((Configurable)this.mapreduceOutputFormat).setConf(this.configuration);* > > > > > > * }* > > > > > > try { > > > > > > this.context = > > > > > > HadoopUtils.instantiateTaskAttemptContext(this.configuration, > > > > > > taskAttemptID); > > > > > > } catch (Exception e) { > > > > > > throw new RuntimeException(e); > > > > > > } > > > > > > .... > > > > > > > > > > > > 4) Probably the modifications apported in point 3 should be > applied > > > > both > > > > > > for mapreduce and mapred packages.. > > > > > > > > > > > > Thanks in advace, > > > > > > Flavio > > > > > > > > > > > > > > > > > > > > > > > > > > > ----------------------------------------------------------------------- > > > > > > this is the HadoopOutputFormatBase.java: > > > > > > > > > ----------------------------------------------------------------------- > > > > > > import java.io.IOException; > > > > > > > > > > > > import org.apache.commons.logging.Log; > > > > > > import org.apache.commons.logging.LogFactory; > > > > > > import org.apache.hadoop.classification.InterfaceAudience; > > > > > > import org.apache.hadoop.classification.InterfaceStability; > > > > > > import org.apache.hadoop.conf.Configurable; > > > > > > import org.apache.hadoop.conf.Configuration; > > > > > > import org.apache.hadoop.hbase.HBaseConfiguration; > > > > > > import org.apache.hadoop.hbase.HConstants; > > > > > > import org.apache.hadoop.hbase.client.Delete; > > > > > > import org.apache.hadoop.hbase.client.HTable; > > > > > > import org.apache.hadoop.hbase.client.Put; > > > > > > import org.apache.hadoop.hbase.mapreduce.TableOutputCommitter; > > > > > > import org.apache.hadoop.hbase.util.FSUtils; > > > > > > import org.apache.hadoop.hbase.zookeeper.ZKUtil; > > > > > > import org.apache.hadoop.mapreduce.JobContext; > > > > > > import org.apache.hadoop.mapreduce.OutputCommitter; > > > > > > import org.apache.hadoop.mapreduce.OutputFormat; > > > > > > import org.apache.hadoop.mapreduce.RecordWriter; > > > > > > import org.apache.hadoop.mapreduce.TaskAttemptContext; > > > > > > > > > > > > /** > > > > > > * Convert Map/Reduce output and write it to an HBase table. The > > KEY > > > is > > > > > > ignored > > > > > > * while the output value <u>must</u> be either a {@link Put} or > a > > > > > > * {@link Delete} instance. > > > > > > * > > > > > > * @param <KEY> The type of the key. Ignored in this class. > > > > > > */ > > > > > > @InterfaceAudience.Public > > > > > > @InterfaceStability.Stable > > > > > > public class HBaseTableOutputFormat<KEY>* extends > OutputFormat<KEY, > > > > Put>* > > > > > > implements Configurable { > > > > > > > > > > > > private final Log LOG = > > > > > LogFactory.getLog(HBaseTableOutputFormat.class); > > > > > > > > > > > > /** Job parameter that specifies the output table. */ > > > > > > public static final String OUTPUT_TABLE = > > > "hbase.mapred.outputtable"; > > > > > > > > > > > > /** > > > > > > * Optional job parameter to specify a peer cluster. > > > > > > * Used specifying remote cluster when copying between hbase > > > clusters > > > > > > (the > > > > > > * source is picked up from <code>hbase-site.xml</code>). > > > > > > * @see TableMapReduceUtil#initTableReducerJob(String, Class, > > > > > > org.apache.hadoop.mapreduce.Job, Class, String, String, String) > > > > > > */ > > > > > > public static final String QUORUM_ADDRESS = > > > > > "hbase.mapred.output.quorum"; > > > > > > > > > > > > /** Optional job parameter to specify peer cluster's ZK client > > port > > > > */ > > > > > > public static final String QUORUM_PORT = > > > > > > "hbase.mapred.output.quorum.port"; > > > > > > > > > > > > /** Optional specification of the rs class name of the peer > > cluster > > > > */ > > > > > > public static final String > > > > > > REGION_SERVER_CLASS = "hbase.mapred.output.rs.class"; > > > > > > /** Optional specification of the rs impl name of the peer > > cluster > > > */ > > > > > > public static final String > > > > > > REGION_SERVER_IMPL = "hbase.mapred.output.rs.impl"; > > > > > > > > > > > > /** The configuration. */ > > > > > > private Configuration conf = null; > > > > > > > > > > > > private HTable table; > > > > > > > > > > > > /** > > > > > > * Writes the reducer output to an HBase table. > > > > > > * > > > > > > * @param <KEY> The type of the key. > > > > > > */ > > > > > > protected static class TableRecordWriter<KEY> > > > > > > *extends RecordWriter<KEY, Put> *{ > > > > > > > > > > > > /** The table to write to. */ > > > > > > private HTable table; > > > > > > > > > > > > /** > > > > > > * Instantiate a TableRecordWriter with the HBase HClient for > > > > > writing. > > > > > > * > > > > > > * @param table The table to write to. > > > > > > */ > > > > > > public TableRecordWriter(HTable table) { > > > > > > this.table = table; > > > > > > } > > > > > > > > > > > > /** > > > > > > * Closes the writer, in this case flush table commits. > > > > > > * > > > > > > * @param context The context. > > > > > > * @throws IOException When closing the writer fails. > > > > > > * @see > > > > > > > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.RecordWriter#close(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > > > */ > > > > > > @Override > > > > > > public void close(TaskAttemptContext context) > > > > > > throws IOException { > > > > > > table.close(); > > > > > > } > > > > > > > > > > > > /** > > > > > > * Writes a key/value pair into the table. > > > > > > * > > > > > > * @param key The key. > > > > > > * @param value The value. > > > > > > * @throws IOException When writing fails. > > > > > > * @see > > > > > > org.apache.hadoop.mapreduce.RecordWriter#write(java.lang.Object, > > > > > > java.lang.Object) > > > > > > */ > > > > > > @Override > > > > > > *public void write(KEY key, Put value)* > > > > > > * throws IOException {* > > > > > > * if (value instanceof Put) this.table.put(new > > > Put((Put)value));* > > > > > > *// else if (value instanceof Delete) this.table.delete(new > > > > > > Delete((Delete)value));* > > > > > > * else throw new IOException("Pass a Delete or a Put");* > > > > > > * }* > > > > > > } > > > > > > > > > > > > /** > > > > > > * Creates a new record writer. > > > > > > * > > > > > > * @param context The current task context. > > > > > > * @return The newly created writer instance. > > > > > > * @throws IOException When creating the writer fails. > > > > > > * @throws InterruptedException When the jobs is cancelled. > > > > > > * @see > > > > > > > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.lib.output.FileOutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > > > */ > > > > > > @Override > > > > > > public RecordWriter<KEY, *Put*> getRecordWriter( > > > > > > TaskAttemptContext context) > > > > > > throws IOException, InterruptedException { > > > > > > return new TableRecordWriter<KEY>(this.table); > > > > > > } > > > > > > > > > > > > /** > > > > > > * Checks if the output target exists. > > > > > > * > > > > > > * @param context The current context. > > > > > > * @throws IOException When the check fails. > > > > > > * @throws InterruptedException When the job is aborted. > > > > > > * @see > > > > > > > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext) > > > > > > */ > > > > > > @Override > > > > > > public void checkOutputSpecs(JobContext context) throws > > > IOException, > > > > > > InterruptedException { > > > > > > // TODO Check if the table exists? > > > > > > > > > > > > } > > > > > > > > > > > > /** > > > > > > * Returns the output committer. > > > > > > * > > > > > > * @param context The current context. > > > > > > * @return The committer. > > > > > > * @throws IOException When creating the committer fails. > > > > > > * @throws InterruptedException When the job is aborted. > > > > > > * @see > > > > > > > > > > > > > > > > > > > > > org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext) > > > > > > */ > > > > > > @Override > > > > > > public OutputCommitter getOutputCommitter(TaskAttemptContext > > > context) > > > > > > throws IOException, InterruptedException { > > > > > > return new TableOutputCommitter(); > > > > > > } > > > > > > > > > > > > public Configuration getConf() { > > > > > > return conf; > > > > > > } > > > > > > > > > > > > @Override > > > > > > public void setConf(Configuration otherConf) { > > > > > > this.conf = HBaseConfiguration.create(otherConf); > > > > > > > > > > > > String tableName = this.conf.get(OUTPUT_TABLE); > > > > > > if(tableName == null || tableName.length() <= 0) { > > > > > > throw new IllegalArgumentException("Must specify table > > name"); > > > > > > } > > > > > > > > > > > > String address = this.conf.get(QUORUM_ADDRESS); > > > > > > int zkClientPort = this.conf.getInt(QUORUM_PORT, 0); > > > > > > String serverClass = this.conf.get(REGION_SERVER_CLASS); > > > > > > String serverImpl = this.conf.get(REGION_SERVER_IMPL); > > > > > > > > > > > > try { > > > > > > if (address != null) { > > > > > > ZKUtil.applyClusterKeyToConf(this.conf, address); > > > > > > } > > > > > > if (serverClass != null) { > > > > > > this.conf.set(HConstants.REGION_SERVER_IMPL, serverImpl); > > > > > > } > > > > > > if (zkClientPort != 0) { > > > > > > this.conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, > > > > zkClientPort); > > > > > > } > > > > > > this.table = new HTable(this.conf, tableName); > > > > > > this.table.setAutoFlush(false, true); > > > > > > * String outDir = > > FSUtils.getTableDir(FSUtils.getRootDir(conf), > > > > > > this.table.getName()).toString();* > > > > > > * this.conf.set("mapred.output.dir", outDir);* > > > > > > * otherConf.set("mapred.output.dir", outDir);* > > > > > > LOG.info("Created table instance for " + tableName); > > > > > > } catch(IOException e) { > > > > > > LOG.error(e); > > > > > > throw new RuntimeException(e); > > > > > > } > > > > > > } > > > > > > } > > > > > > > > > > > > > > > > > > > > > > > > > > |
Hey Flavio,
I checked out your "master" branch and started the HBaseWriteExample. It started without errors (there were some errors connecting to Zookeeper, but thats probably because I don't have HBase running). Am I using the right code ( https://github.com/fpompermaier/flink/commit/c1934da379dba360ad61d18bf921fae08822795a) to reproduce this error? Maybe the error is also happening when the mapper is starting. Can you try making the following changes to your code? https://gist.github.com/rmetzger/a218beca4b0442f3c1f3 This is basically making the field that contains the non-serializable "Put" element transient. On Sat, Apr 4, 2015 at 8:40 AM, Flavio Pompermaier <[hidden email]> wrote: > Any fix for this? > On Apr 3, 2015 7:43 AM, "Flavio Pompermaier" <[hidden email]> wrote: > |
There's no way to register put with kryo for example?
On Apr 4, 2015 10:06 AM, "Robert Metzger" <[hidden email]> wrote: > Hey Flavio, > > I checked out your "master" branch and started the HBaseWriteExample. > It started without errors (there were some errors connecting to Zookeeper, > but thats probably because I don't have HBase running). > Am I using the right code ( > > https://github.com/fpompermaier/flink/commit/c1934da379dba360ad61d18bf921fae08822795a > ) > to reproduce this error? > Maybe the error is also happening when the mapper is starting. > > Can you try making the following changes to your code? > https://gist.github.com/rmetzger/a218beca4b0442f3c1f3 > This is basically making the field that contains the non-serializable "Put" > element transient. > > > > On Sat, Apr 4, 2015 at 8:40 AM, Flavio Pompermaier <[hidden email]> > wrote: > > > Any fix for this? > > On Apr 3, 2015 7:43 AM, "Flavio Pompermaier" <[hidden email]> > wrote: > > > |
User functions are still serialized using Java serialization, not Kryo.
Kryo is only used for data exchange at runtime between tasks. If a function such as your MapFunction has a non-serializable member variable, you need to declare it as transient and initialize it before it is executed, e.g., via open() or the first invocation of the functions processing method such as map(). 2015-04-04 10:59 GMT+02:00 Flavio Pompermaier <[hidden email]>: > There's no way to register put with kryo for example? > On Apr 4, 2015 10:06 AM, "Robert Metzger" <[hidden email]> wrote: > > > Hey Flavio, > > > > I checked out your "master" branch and started the HBaseWriteExample. > > It started without errors (there were some errors connecting to > Zookeeper, > > but thats probably because I don't have HBase running). > > Am I using the right code ( > > > > > https://github.com/fpompermaier/flink/commit/c1934da379dba360ad61d18bf921fae08822795a > > ) > > to reproduce this error? > > Maybe the error is also happening when the mapper is starting. > > > > Can you try making the following changes to your code? > > https://gist.github.com/rmetzger/a218beca4b0442f3c1f3 > > This is basically making the field that contains the non-serializable > "Put" > > element transient. > > > > > > > > On Sat, Apr 4, 2015 at 8:40 AM, Flavio Pompermaier <[hidden email] > > > > wrote: > > > > > Any fix for this? > > > On Apr 3, 2015 7:43 AM, "Flavio Pompermaier" <[hidden email]> > > wrote: > > > > > > |
Ok graeat!this was not prtfectly clear to me! I'll try that now.
About reuse variable instead..I use it because I saw that this is a common practice ib the examples but I'd like to know whether there's a real benefit in reusing it for return tuple with respect to returning a brand new one each time. Any insight about this? On Apr 4, 2015 11:17 AM, "Fabian Hueske" <[hidden email]> wrote: > User functions are still serialized using Java serialization, not Kryo. > Kryo is only used for data exchange at runtime between tasks. > > If a function such as your MapFunction has a non-serializable member > variable, you need to declare it as transient and initialize it before it > is executed, e.g., via open() or the first invocation of the functions > processing method such as map(). > > 2015-04-04 10:59 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > There's no way to register put with kryo for example? > > On Apr 4, 2015 10:06 AM, "Robert Metzger" <[hidden email]> wrote: > > > > > Hey Flavio, > > > > > > I checked out your "master" branch and started the HBaseWriteExample. > > > It started without errors (there were some errors connecting to > > Zookeeper, > > > but thats probably because I don't have HBase running). > > > Am I using the right code ( > > > > > > > > > https://github.com/fpompermaier/flink/commit/c1934da379dba360ad61d18bf921fae08822795a > > > ) > > > to reproduce this error? > > > Maybe the error is also happening when the mapper is starting. > > > > > > Can you try making the following changes to your code? > > > https://gist.github.com/rmetzger/a218beca4b0442f3c1f3 > > > This is basically making the field that contains the non-serializable > > "Put" > > > element transient. > > > > > > > > > > > > On Sat, Apr 4, 2015 at 8:40 AM, Flavio Pompermaier < > [hidden email] > > > > > > wrote: > > > > > > > Any fix for this? > > > > On Apr 3, 2015 7:43 AM, "Flavio Pompermaier" <[hidden email]> > > > wrote: > > > > > > > > > > |
Yes, reusing output objects is a good practice but optional. It can help to bring down GC overhead.
You could make your function a RichFunction and initialize the output object in open(). Switching function serialization to Kryo is on our TODO list (FLINK-1256). Would be good to fix that soon, IMO. Cheers, Fabian From: Flavio Pompermaier Sent: Saturday, 4. April, 2015 11:23 To: [hidden email] Ok graeat!this was not prtfectly clear to me! I'll try that now. About reuse variable instead..I use it because I saw that this is a common practice ib the examples but I'd like to know whether there's a real benefit in reusing it for return tuple with respect to returning a brand new one each time. Any insight about this? On Apr 4, 2015 11:17 AM, "Fabian Hueske" <[hidden email]> wrote: > User functions are still serialized using Java serialization, not Kryo. > Kryo is only used for data exchange at runtime between tasks. > > If a function such as your MapFunction has a non-serializable member > variable, you need to declare it as transient and initialize it before it > is executed, e.g., via open() or the first invocation of the functions > processing method such as map(). > > 2015-04-04 10:59 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > There's no way to register put with kryo for example? > > On Apr 4, 2015 10:06 AM, "Robert Metzger" <[hidden email]> wrote: > > > > > Hey Flavio, > > > > > > I checked out your "master" branch and started the HBaseWriteExample. > > > It started without errors (there were some errors connecting to > > Zookeeper, > > > but thats probably because I don't have HBase running). > > > Am I using the right code ( > > > > > > > > > https://github.com/fpompermaier/flink/commit/c1934da379dba360ad61d18bf921fae08822795a > > > ) > > > to reproduce this error? > > > Maybe the error is also happening when the mapper is starting. > > > > > > Can you try making the following changes to your code? > > > https://gist.github.com/rmetzger/a218beca4b0442f3c1f3 > > > This is basically making the field that contains the non-serializable > > "Put" > > > element transient. > > > > > > > > > > > > On Sat, Apr 4, 2015 at 8:40 AM, Flavio Pompermaier < > [hidden email] > > > > > > wrote: > > > > > > > Any fix for this? > > > > On Apr 3, 2015 7:43 AM, "Flavio Pompermaier" <[hidden email]> > > > wrote: > > > > > > > > > > |
I created a JIRA ticket for this problem (
https://issues.apache.org/jira/browse/FLINK-1828) and I just made a PR that fix that (thanks Fabian and Robert for the great support!) Last question: is the "mapred.output.dir" parameter really necessary? At the end of my job that writes to hbase I found only _SUCCESSn and ._SUCCESS.crc files.. Best. Flavio On Sat, Apr 4, 2015 at 12:18 PM, <[hidden email]> wrote: > Yes, reusing output objects is a good practice but optional. It can help > to bring down GC overhead. > > You could make your function a RichFunction and initialize the output > object in open(). > > > > Switching function serialization to Kryo is on our TODO list (FLINK-1256). > Would be good to fix that soon, IMO. > > > Cheers, Fabian > > > From: Flavio Pompermaier > Sent: Saturday, 4. April, 2015 11:23 > To: [hidden email] > > > > > > Ok graeat!this was not prtfectly clear to me! I'll try that now. > > About reuse variable instead..I use it because I saw that this is a common > practice ib the examples but I'd like to know whether there's a real > benefit in reusing it for return tuple with respect to returning a brand > new one each time. Any insight about this? > On Apr 4, 2015 11:17 AM, "Fabian Hueske" <[hidden email]> wrote: > > > User functions are still serialized using Java serialization, not Kryo. > > Kryo is only used for data exchange at runtime between tasks. > > > > If a function such as your MapFunction has a non-serializable member > > variable, you need to declare it as transient and initialize it before it > > is executed, e.g., via open() or the first invocation of the functions > > processing method such as map(). > > > > 2015-04-04 10:59 GMT+02:00 Flavio Pompermaier <[hidden email]>: > > > > > There's no way to register put with kryo for example? > > > On Apr 4, 2015 10:06 AM, "Robert Metzger" <[hidden email]> wrote: > > > > > > > Hey Flavio, > > > > > > > > I checked out your "master" branch and started the HBaseWriteExample. > > > > It started without errors (there were some errors connecting to > > > Zookeeper, > > > > but thats probably because I don't have HBase running). > > > > Am I using the right code ( > > > > > > > > > > > > > > https://github.com/fpompermaier/flink/commit/c1934da379dba360ad61d18bf921fae08822795a > > > > ) > > > > to reproduce this error? > > > > Maybe the error is also happening when the mapper is starting. > > > > > > > > Can you try making the following changes to your code? > > > > https://gist.github.com/rmetzger/a218beca4b0442f3c1f3 > > > > This is basically making the field that contains the non-serializable > > > "Put" > > > > element transient. > > > > > > > > > > > > > > > > On Sat, Apr 4, 2015 at 8:40 AM, Flavio Pompermaier < > > [hidden email] > > > > > > > > wrote: > > > > > > > > > Any fix for this? > > > > > On Apr 3, 2015 7:43 AM, "Flavio Pompermaier" <[hidden email] > > > > > > wrote: > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |