xingyuan cheng created FLINK-17304:
-------------------------------------- Summary: Kafka two streams cannot use Flink SQL to query inner join Key: FLINK-17304 URL: https://issues.apache.org/jira/browse/FLINK-17304 Project: Flink Issue Type: Bug Components: API / DataStream, Table SQL / API Affects Versions: 1.9.0 Environment: flink.version=1.9.0 scala.binary.version=2.11 Reporter: xingyuan cheng In my work, I found that when subscribing datastream from two different topics of Kafka, the operator operations of the two streams can be executed respectively, but in the end, I did not query the inner join through Flink SQL as expected. What do I need to do to make it work? TestStreamSQL.java ``` public class TestStreamSQL { private static Logger log = LoggerFactory.getLogger(BinlogStreamSQL.class); public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); env.getCheckpointConfig().setCheckpointTimeout(60000); env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // env.setStateBackend(new FsStateBackend("hdfs://ido001:8020/user/lwj/flink/checkpoint")); StreamTableEnvironment tEnv = StreamTableEnvironment.create(env); StreamQueryConfig queryConfig = new StreamQueryConfig(); queryConfig.withIdleStateRetentionTime(Time.days(10), Time.days(30)); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "ido001.gzcb.com:9092,ido002.gzcb.com:9092,ido003.gzcb.com:9092"); properties.setProperty("group.id", "flink"); String topic_1 = "bps-16-r3p3"; String topic_2 = "bps-16-r3p4"; DataStreamSource<String> topic1 = env.addSource(new FlinkKafkaConsumer010<String>(topic_1, new SimpleStringSchema(), properties)); SingleOutputStreamOperator<Tuple3<String, String, String>> kafkaSource1 = topic1.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { try { BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class); if ("app_case".equals(binLogBean.getTableName())){ return true; }else { return false; } }catch (Exception e){ log.error("JSON转换失败,str={}", value, e); return false; } } }).map(new MapFunction<String, Tuple3<String, String, String>>() { @Override public Tuple3<String, String, String> map(String s) throws Exception { BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class); String case_id = BinLogUtil.getValueByField(binLogBean, "case_id"); String close_time = BinLogUtil.getValueByField(binLogBean, "close_time"); String approve_result = BinLogUtil.getValueByField(binLogBean, "approve_result"); return new Tuple3<String, String, String>(case_id, close_time, approve_result); } }); tEnv.registerDataStream("app_case", kafkaSource1, "case_id, close_time, approve_result"); DataStreamSource<String> topic2 = env.addSource(new FlinkKafkaConsumer010<String>(topic_2, new SimpleStringSchema(), properties)); SingleOutputStreamOperator<Tuple2<String, String>> kafkaSource2 = topic2.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { try { BinLogBean binLogBean = JSONObject.parseObject(value, BinLogBean.class); if ("cm_customer".equals(binLogBean.getTableName())){ return true; }else { return false; } }catch (Exception e){ log.error("JSON转换失败,str={}", value, e); return false; } } }).map(new MapFunction<String, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(String s) throws Exception { BinLogBean binLogBean = JSONObject.parseObject(s, BinLogBean.class); String case_id = BinLogUtil.getValueByField(binLogBean, "case_id"); String idtfno = BinLogUtil.getValueByField(binLogBean, "idtfno"); return new Tuple2<String, String>(case_id, idtfno); } }); tEnv.registerDataStream("cm_customer", kafkaSource2, "case_id, idtfno"); Table result = tEnv.sqlQuery("select a.*,b.idtfno " + "from app_case a left join cm_customer b on a.case_id = b.case_id " + "where a.close_time not in('')"); tEnv.toRetractStream(result, Row.class, queryConfig).filter(new FilterFunction<Tuple2<Boolean, Row>>() { @Override public boolean filter(Tuple2<Boolean, Row> booleanRowTuple2) throws Exception { return booleanRowTuple2.f0; } }).print(); env.execute(); } } ``` BinLogBean.java ``` public class BinLogBean implements Serializable{ private String instance; private int version; private Long serverId; private String executeTime; private String logfileName; private Long logfileOffset; /** * database name */ private String schemaName; private String tableName; private String eventType; private List<ColumnField> columnFieldsList; public String getInstance() { return instance; } public void setInstance(String instance) { this.instance = instance; } public List<ColumnField> getColumnFieldsList() { return columnFieldsList; } public void setColumnFieldsList(List<ColumnField> columnFieldsList) { this.columnFieldsList = columnFieldsList; } public int getVersion() { return version; } public void setVersion(int version) { this.version = version; } public Long getServerId() { return serverId; } public void setServerId(Long serverId) { this.serverId = serverId; } public String getExecuteTime() { return executeTime; } public void setExecuteTime(String executeTime) { this.executeTime = executeTime; } public String getLogfileName() { return logfileName; } public void setLogfileName(String logfileName) { this.logfileName = logfileName; } public Long getLogfileOffset() { return logfileOffset; } public void setLogfileOffset(Long logfileOffset) { this.logfileOffset = logfileOffset; } public String getSchemaName() { return schemaName; } public void setSchemaName(String schemaName) { this.schemaName = schemaName; } public String getTableName() { return tableName; } public void setTableName(String tableName) { this.tableName = tableName; } public String getEventType() { return eventType; } public void setEventType(String eventType) { this.eventType = eventType; } } ``` -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |