xiaohang.li created FLINK-18960:
-----------------------------------
Summary: flink sideoutput union
Key: FLINK-18960
URL:
https://issues.apache.org/jira/browse/FLINK-18960 Project: Flink
Issue Type: Bug
Components: API / DataStream
Affects Versions: 1.10.1
Environment: val side = new OutputTag[String]("side")
val side2 = new OutputTag[String]("side2")
val side3 = new OutputTag[String]("side3")
val ds = env.socketTextStream("master",9001)
val res = ds.process(new ProcessFunction[String,String] {
override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
if(value.contains("hello")){
ctx.output(side,value)
}else if(value.contains("world")){
ctx.output(side2,value)
}else if(value.contains("flink")){
ctx.output(side3,value)
}
out.collect(value)
}
})
val res1 = res.getSideOutput(side)
val res2 = res.getSideOutput(side2)
val res3 = res.getSideOutput(side3)
println( "====>"+res1.getClass)
println( "====>"+res2.getClass)
res1.print("res1")
res2.print("res2")
res3.print("res3")
res2.union(res1).union(res3).print("all")
在socket端口分别输入
hello
world
flink
idea显示数据如下
res1> hello
res2> world
res3> flink
all> flink
all> flink
all> flink
可见在all输出流显示的是最后一个union的侧输出流*union的次数,实际显示应为
all>flink
Reporter: xiaohang.li
flink sideoutput union操作时数据出现问题。从主流分出来的侧输出流进行union操作时,显示输出的是以最后一个union的数据流结果*union的次数
--
This message was sent by Atlassian Jira
(v8.3.4#803005)