You could approach testing this in the same way that Flink has implemented
its unit tests for KeyedBroadcastProcessFunctions, which is to use a KeyedTwoInputStreamOperatorTestHarness with a CoBroadcastWithKeyedOperator. To learn how to use Flink's test harnesses, see [1], and for an example of testing a KeyedBroadcastProcessFunction, see [2]. [1] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/testing.html#unit-testing-stateful-or-timely-udfs--custom-operators [2] https://github.com/apache/flink/blob/release-1.11/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/co/CoBroadcastWithKeyedOperatorTest.java Best, David On Wed, Jul 15, 2020 at 8:32 PM bujjirahul45 <[hidden email]> wrote: > Hi, > > I am new to flink i am trying write junit test cases to test > KeyedBroadCastProcessFunction. Below is my code ,i am currently calling the > getDataStreamOutput method in TestUtils class and passing inputdata and > patternrules to method once the input data is evaluated against list of > pattern rules and if input data satisfy the condition i will get the signal > and calling sink function and returning output data as string in > getDataStreamOutput method > > @Test > public void testCompareInputAndOutputDataForInputSignal() throws > Exception { > Assertions.assertEquals(sampleInputSignal, > TestUtils.getDataStreamOutput( > inputSignal, > patternRules)); > } > > > > public static String getDataStreamOutput(JSONObject input, Map<String, > String> patternRules) throws Exception { > > env.setParallelism(1); > > DataStream<JSONObject> inputSignal = env.fromElements(input); > > DataStream<Map<String, String>> rawPatternStream = > env.fromElements(patternRules); > > //Generate a key,value pair of set of patterns where key is > pattern name and value is pattern condition > DataStream<Tuple2<String, Map<String, String>>> > patternRuleStream = > rawPatternStream.flatMap(new > FlatMapFunction<Map<String, String>, > Tuple2<String, Map<String, String>>>() { > @Override > public void flatMap(Map<String, String> > patternRules, > Collector<Tuple2<String, > Map<String, String>>> out) throws Exception { > for (Map.Entry<String, String> stringEntry : > patternRules.entrySet()) { > JSONObject jsonObject = new > JSONObject(stringEntry.getValue()); > Map<String, String> map = new HashMap<>(); > for (String key : jsonObject.keySet()) { > String value = > jsonObject.get(key).toString(); > map.put(key, value); > } > out.collect(new > Tuple2<>(stringEntry.getKey(), map)); > } > } > }); > > BroadcastStream<Tuple2<String, Map<String, String>>> > patternRuleBroadcast = > patternStream.broadcast(patternRuleDescriptor); > > > DataStream<Tuple2<String, JSONObject>> validSignal = > inputSignal.map(new MapFunction<JSONObject, > Tuple2<String, JSONObject>>() { > @Override > public Tuple2<String, JSONObject> map(JSONObject > inputSignal) throws Exception { > String source = > inputSignal.getSource(); > return new Tuple2<>(source, inputSignal); > } > }).keyBy(0).connect(patternRuleBroadcast).process(new > MyKeyedBroadCastProcessFunction()); > > > validSignal.map(new MapFunction<Tuple2<String, JSONObject>, > JSONObject>() { > @Override > public JSONObject map(Tuple2<String, JSONObject> > inputSignal) throws Exception { > return inputSignal.f1; > } > }).addSink(new getDataStreamOutput()); > > env.execute("TestFlink"); > } > return (getDataStreamOutput.dataStreamOutput); > } > > > @SuppressWarnings("serial") > public static final class getDataStreamOutput implements > SinkFunction<JSONObject> { > public static String dataStreamOutput; > > public void invoke(JSONObject inputSignal) throws Exception { > dataStreamOutput = inputSignal.toString(); > } > } > I need to test different inputs with same broadcast rules but each time i > am calling this function its again and again doing process from beginning > take input signal broadcast data, is there a way i can broadcast once and > keeping on sending the input to the method i explored i can use > CoFlatMapFunction something like below to combine datastream and keep on > sending the input rules while method is running but for this one of the > datastream has to keep on getting data from kafka topic again it will > overburden on method to load kafka utils and server > > DataStream<JSONObject> inputSignalFromKafka = > env.addSource(inputSignalKafka); > > DataStream<org.json.JSONObject> inputSignalFromMethod = > env.fromElements(inputSignal)); > > DataStream<JSONObject> inputSignal = > inputSignalFromMethod.connect(inputSignalFromKafka) > .flatMap(new SignalCoFlatMapper()); > > > public static class SignalCoFlatMapper > implements CoFlatMapFunction<JSONObject, JSONObject, > JSONObject> { > > @Override > public void flatMap1(JSONObject inputValue, Collector<JSONObject> > out) throws Exception { > out.collect(inputValue); > > } > > @Override > public void flatMap2(JSONObject kafkaValue, Collector<JSONObject> > out) throws Exception { > out.collect(kafkaValue); > > } > } > I found a link in stackoverflow How to unit test BroadcastProcessFunction > in flink when processElement depends on broadcasted data but this is > confused me a lot > > Any way i can only broadcast only once in Before method in test cases and > keeping sending different kind of data to my broadcast function > > > Thanks, > Rahul. > |
Free forum by Nabble | Edit this page |