package org.ua.ap; import org.apache.flink.api.common.ProgramDescription; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; import org.apache.flink.graph.Vertex; import org.apache.flink.util.Collector; public class AffinityPropogationExample implements ProgramDescription{ public static final String vertexPreferenceInputPath = "ToyProblemPreferences.txt"; public static final String similarGraphInputPath = "ToyProblemSimilarities.txt"; public static final int maxIterations = 100; @Override public String getDescription() { // TODO Auto-generated method stub return "This is an example of Affinity Propogation"; } public static void main(String[] args) throws Exception{ /*set up environment*/ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); /*Set up graph*/ DataSet> vertices = getVertexDataSet(env); DataSet> distanceEdges = getEdgeDataSet(env); DataSet> selfEdges = getPreferenceSet(env); DataSet> allEdges = distanceEdges.union(selfEdges); Graph graph = Graph.fromDataSet(vertices, allEdges, env); /*Run affinity propagation algorithm*/ Graph result = graph.run(new AffinityPropogation(100, 0.5)); result.getVertices().print(); // result.getVertices().writeAsCsv("result.txt","\n", " "); /*Test*/ env.execute("Affinity Propogation Example"); } @SuppressWarnings("serial") public static DataSet> getVertexDataSet(ExecutionEnvironment env){ return env.readCsvFile(vertexPreferenceInputPath). fieldDelimiter(' ').lineDelimiter("\n"). types(Long.class, Double.class). map(new MapFunction, Vertex>() { @Override public Vertex map(Tuple2 tuple2) throws Exception { return new Vertex(tuple2.f0, tuple2.f0); } }); } @SuppressWarnings("serial") public static DataSet> getEdgeDataSet(ExecutionEnvironment env){ return env.readCsvFile(similarGraphInputPath) .fieldDelimiter(' ').lineDelimiter("\n") .types(Long.class, Long.class, Double.class) .map(new MapFunction, Edge>(){ @Override public Edge map( Tuple3 tuple3) throws Exception { // TODO Auto-generated method stub return new Edge(tuple3.f0, tuple3.f1,tuple3.f2); } }); } @SuppressWarnings("serial") public static DataSet> getPreferenceSet(ExecutionEnvironment env){ return env.readCsvFile(vertexPreferenceInputPath) .fieldDelimiter(' ').lineDelimiter("\n") .types(Long.class, Double.class) .map(new MapFunction, Edge>(){ @Override public Edge map(Tuple2 value) throws Exception { return new Edge(value.f0, value.f0, value.f1); } }); } }