[jira] [Created] (FLINK-18960) flink sideoutput union

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-18960) flink sideoutput union

Shang Yuanchun (Jira)
xiaohang.li created FLINK-18960:
-----------------------------------

             Summary: flink sideoutput union
                 Key: FLINK-18960
                 URL: https://issues.apache.org/jira/browse/FLINK-18960
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.10.1
         Environment: val side = new OutputTag[String]("side")
val side2 = new OutputTag[String]("side2")
val side3 = new OutputTag[String]("side3")
val ds = env.socketTextStream("master",9001)
val res = ds.process(new ProcessFunction[String,String] {
 override def processElement(value: String, ctx: ProcessFunction[String, String]#Context, out: Collector[String]): Unit = {
 if(value.contains("hello")){
 ctx.output(side,value)
 }else if(value.contains("world")){
 ctx.output(side2,value)
 }else if(value.contains("flink")){
 ctx.output(side3,value)
 }
 out.collect(value)
 }
})

val res1 = res.getSideOutput(side)
val res2 = res.getSideOutput(side2)
val res3 = res.getSideOutput(side3)


println( "====>"+res1.getClass)
println( "====>"+res2.getClass)


res1.print("res1")
res2.print("res2")
res3.print("res3")

res2.union(res1).union(res3).print("all")

 

 

 

在socket端口分别输入

hello

world

flink

 

idea显示数据如下

res1> hello
res2> world
res3> flink
all> flink
all> flink
all> flink

 

可见在all输出流显示的是最后一个union的侧输出流*union的次数,实际显示应为

all>flink
            Reporter: xiaohang.li


flink sideoutput union操作时数据出现问题。从主流分出来的侧输出流进行union操作时,显示输出的是以最后一个union的数据流结果*union的次数

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)