Hi
I have a Flink 1.7 with Kafka 0.11 and ES 6.5 setup. I can see the Flink Kafka Consumer consuming messages, but these are not passed on to the next level, that is the elasticsearch sink. Unable to find any logs relevant to this. Logs about my kafka consumers 2019-01-16 17:28:05,860 DEBUG org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer - Committing offsets: {legitimatestream-16=OffsetAndMetadata{offset=203252637, metadata=''}, legitimatestream-18=OffsetAndMetadata{offset=203236011, metadata=''}, legitimatestream-20=OffsetAndMetadata{offset=203237190, metadata=''}, legitimatestream-22=OffsetAndMetadata{offset=203273504, metadata=''}, legitimatestream-8=OffsetAndMetadata{offset=203251672, metadata=''}, legitimatestream-10=OffsetAndMetadata{offset=203235871, metadata=''}, legitimatestream-12=OffsetAndMetadata{offset=203242970, metadata=''}, legitimatestream-14=OffsetAndMetadata{offset=203269129, metadata=''}, legitimatestream-0=OffsetAndMetadata{offset=203247420, metadata=''}, legitimatestream--9=OffsetAndMetadata{offset=203226435, metadata=''}, legitimatestream-11=OffsetAndMetadata{offset=203259207, metadata=''}, legitimatestream-13=OffsetAndMetadata{offset=203262566, metadata=''}, legitimatestream-1=OffsetAndMetadata{offset=203230950, metadata=''}, legitimatestream-3=OffsetAndMetadata{offset=203260289, metadata=''}, legitimatestream-5=OffsetAndMetadata{offset=203285827, metadata=''}, legitimatestream-24=OffsetAndMetadata{offset=203240761, metadata=''}, legitimatestream-26=OffsetAndMetadata{offset=203254649, metadata=''}, legitimatestream-28=OffsetAndMetadata{offset=203265863, metadata=''}} I am unable to see any logs regarding passing this data to the next level, which is the Table Query, followed by sink. Can anyone help me out with why this might happen, or am i missing something ?? Here is the snippet of my code: StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); tableEnv.connect(new Kafka() .version("0.11") .topic(params.getRequired("read-topic")) .property("auto.offset.reset", "latest") .property("group.id", params.getRequired("group.id")) .property("bootstrap.servers", params.getRequired("bootstrap.servers"))) .withSchema(new Schema() .field("sid", Types.STRING()) .field("_zpsbd6", Types.STRING()) .field("r1", Types.STRING()) .field("r2", Types.STRING()) .field("r5", Types.STRING()) .field("r10", Types.STRING()) .field("isBot", Types.BOOLEAN()) .field("botcode", Types.STRING()) .field("ts", Types.SQL_TIMESTAMP()) /*.field("sensor", Types.STRING()) .field("temp", Types.LONG()) .field("ts", Types.SQL_TIMESTAMP())*/ .rowtime(new Rowtime() .timestampsFromField("_zpsbda") .watermarksPeriodicBounded(5000) ) ) .withFormat(new Json().deriveSchema()) .inAppendMode() .registerTableSource("sourceTopic"); Table query = tableEnv.sqlQuery("SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, " + "TUMBLE_START(ts, INTERVAL '1' MINUTE) as tumbleStart, " + "TUMBLE_END(ts, INTERVAL '1' MINUTE) as tumbleEnd FROM sourceTopic " + "WHERE r1='true' or r2='true' or r5='true' or r10='true' and isBot='true'" + "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), sid, _zpsbd6"); TypeInformation<Row> schema = query.getSchema().toRowType(); SerializationSchema<Row> serializationSchema = new JsonRowSerializationSchema(schema); DataStream<Row> ds = tableEnv.toAppendStream(query, Row.class); ds.print(); List<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); ElasticsearchSink.Builder<Row> esSinkBuilder = new ElasticsearchSink.Builder<>( httpHosts, new ElasticsearchSinkFunction<Row>() { public IndexRequest createIndexRequest(Row row) { byte[] document = serializationSchema.serialize(row); return new IndexRequest("prod", "logs") .source(document, XContentType.JSON); } @Override public void process(Row r, RuntimeContext runtimeContext, RequestIndexer requestIndexer) { System.out.println(r); requestIndexer.add(createIndexRequest(r)); } } ); esSinkBuilder.setBulkFlushMaxActions(1); ds.addSink(esSinkBuilder.build()); Thanks, ~Ramya. |
Hey,
Could You please format Your snippet? It is very hard to understand what is going on in there. Best Regards, Dom. śr., 16 sty 2019 o 13:05 Ramya Ramamurthy <[hidden email]> napisał(a): > Hi > > I have a Flink 1.7 with Kafka 0.11 and ES 6.5 setup. > > I can see the Flink Kafka Consumer consuming messages, but these are not > passed on to the next level, that is the elasticsearch sink. Unable to find > any logs relevant to this. > > Logs about my kafka consumers > > 2019-01-16 17:28:05,860 DEBUG > > org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer > - Committing offsets: > {legitimatestream-16=OffsetAndMetadata{offset=203252637, metadata=''}, > legitimatestream-18=OffsetAndMetadata{offset=203236011, metadata=''}, > legitimatestream-20=OffsetAndMetadata{offset=203237190, metadata=''}, > legitimatestream-22=OffsetAndMetadata{offset=203273504, metadata=''}, > legitimatestream-8=OffsetAndMetadata{offset=203251672, metadata=''}, > legitimatestream-10=OffsetAndMetadata{offset=203235871, metadata=''}, > legitimatestream-12=OffsetAndMetadata{offset=203242970, metadata=''}, > legitimatestream-14=OffsetAndMetadata{offset=203269129, metadata=''}, > legitimatestream-0=OffsetAndMetadata{offset=203247420, metadata=''}, > legitimatestream--9=OffsetAndMetadata{offset=203226435, metadata=''}, > legitimatestream-11=OffsetAndMetadata{offset=203259207, metadata=''}, > legitimatestream-13=OffsetAndMetadata{offset=203262566, metadata=''}, > legitimatestream-1=OffsetAndMetadata{offset=203230950, metadata=''}, > legitimatestream-3=OffsetAndMetadata{offset=203260289, metadata=''}, > legitimatestream-5=OffsetAndMetadata{offset=203285827, metadata=''}, > legitimatestream-24=OffsetAndMetadata{offset=203240761, metadata=''}, > legitimatestream-26=OffsetAndMetadata{offset=203254649, metadata=''}, > legitimatestream-28=OffsetAndMetadata{offset=203265863, metadata=''}} > > I am unable to see any logs regarding passing this data to the next level, > which is the Table Query, followed by sink. > > Can anyone help me out with why this might happen, or am i missing > something ?? > > Here is the snippet of my code: > StreamTableEnvironment tableEnv = > TableEnvironment.getTableEnvironment(env); > > tableEnv.connect(new Kafka() > .version("0.11") > .topic(params.getRequired("read-topic")) > .property("auto.offset.reset", "latest") > .property("group.id", params.getRequired("group.id")) > .property("bootstrap.servers", > params.getRequired("bootstrap.servers"))) > .withSchema(new Schema() > .field("sid", Types.STRING()) > .field("_zpsbd6", Types.STRING()) > .field("r1", Types.STRING()) > .field("r2", Types.STRING()) > .field("r5", Types.STRING()) > .field("r10", Types.STRING()) > .field("isBot", Types.BOOLEAN()) > .field("botcode", Types.STRING()) > .field("ts", Types.SQL_TIMESTAMP()) > /*.field("sensor", Types.STRING()) > .field("temp", Types.LONG()) > .field("ts", Types.SQL_TIMESTAMP())*/ > .rowtime(new Rowtime() > .timestampsFromField("_zpsbda") > .watermarksPeriodicBounded(5000) > ) > ) > .withFormat(new Json().deriveSchema()) > .inAppendMode() > .registerTableSource("sourceTopic"); > Table query = tableEnv.sqlQuery("SELECT sid, _zpsbd6 as ip, COUNT(*) as > total_hits, " + > "TUMBLE_START(ts, INTERVAL '1' MINUTE) as > tumbleStart, " + > "TUMBLE_END(ts, INTERVAL '1' MINUTE) as > tumbleEnd FROM sourceTopic " + > "WHERE r1='true' or r2='true' or r5='true' > or r10='true' and isBot='true'" + > "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE), > sid, _zpsbd6"); > > TypeInformation<Row> schema = > query.getSchema().toRowType(); > SerializationSchema<Row> serializationSchema = new > JsonRowSerializationSchema(schema); > DataStream<Row> ds = tableEnv.toAppendStream(query, > Row.class); > ds.print(); > > List<HttpHost> httpHosts = new ArrayList<>(); > httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); > ElasticsearchSink.Builder<Row> esSinkBuilder = new > ElasticsearchSink.Builder<>( > httpHosts, > new ElasticsearchSinkFunction<Row>() { > > public IndexRequest > createIndexRequest(Row row) { > byte[] document = > serializationSchema.serialize(row); > > return new > IndexRequest("prod", "logs") > > .source(document, XContentType.JSON); > > } > > @Override > public void process(Row r, > RuntimeContext runtimeContext, RequestIndexer requestIndexer) { > System.out.println(r); > > requestIndexer.add(createIndexRequest(r)); > } > } > ); > > esSinkBuilder.setBulkFlushMaxActions(1); > > ds.addSink(esSinkBuilder.build()); > > > Thanks, > ~Ramya. > |
Free forum by Nabble | Edit this page |