Data not getting passed between operators

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Data not getting passed between operators

Ramya Ramamurthy
Hi

I have a Flink 1.7 with Kafka 0.11 and ES 6.5 setup.

I can see the Flink Kafka Consumer consuming messages, but these are not
passed on to the next level, that is the elasticsearch sink. Unable to find
any logs relevant to this.

Logs about my kafka consumers

2019-01-16 17:28:05,860 DEBUG
org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
- Committing offsets:
{legitimatestream-16=OffsetAndMetadata{offset=203252637, metadata=''},
legitimatestream-18=OffsetAndMetadata{offset=203236011, metadata=''},
legitimatestream-20=OffsetAndMetadata{offset=203237190, metadata=''},
legitimatestream-22=OffsetAndMetadata{offset=203273504, metadata=''},
legitimatestream-8=OffsetAndMetadata{offset=203251672, metadata=''},
legitimatestream-10=OffsetAndMetadata{offset=203235871, metadata=''},
legitimatestream-12=OffsetAndMetadata{offset=203242970, metadata=''},
legitimatestream-14=OffsetAndMetadata{offset=203269129, metadata=''},
legitimatestream-0=OffsetAndMetadata{offset=203247420, metadata=''},
legitimatestream--9=OffsetAndMetadata{offset=203226435, metadata=''},
legitimatestream-11=OffsetAndMetadata{offset=203259207, metadata=''},
legitimatestream-13=OffsetAndMetadata{offset=203262566, metadata=''},
legitimatestream-1=OffsetAndMetadata{offset=203230950, metadata=''},
legitimatestream-3=OffsetAndMetadata{offset=203260289, metadata=''},
legitimatestream-5=OffsetAndMetadata{offset=203285827, metadata=''},
legitimatestream-24=OffsetAndMetadata{offset=203240761, metadata=''},
legitimatestream-26=OffsetAndMetadata{offset=203254649, metadata=''},
legitimatestream-28=OffsetAndMetadata{offset=203265863, metadata=''}}

I am unable to see any logs regarding passing this data to the next level,
which is the Table Query, followed by sink.

Can anyone help me out with why this might happen, or am i missing
something ??

Here is the snippet of my code:
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

                tableEnv.connect(new Kafka()
                                .version("0.11")
                                .topic(params.getRequired("read-topic"))
                                .property("auto.offset.reset", "latest")
                .property("group.id", params.getRequired("group.id"))
                                .property("bootstrap.servers",
params.getRequired("bootstrap.servers")))
                                .withSchema(new Schema()
                        .field("sid", Types.STRING())
                        .field("_zpsbd6", Types.STRING())
                        .field("r1", Types.STRING())
                        .field("r2", Types.STRING())
                        .field("r5", Types.STRING())
                        .field("r10", Types.STRING())
                        .field("isBot", Types.BOOLEAN())
                        .field("botcode", Types.STRING())
                        .field("ts", Types.SQL_TIMESTAMP())
                        /*.field("sensor", Types.STRING())
                        .field("temp", Types.LONG())
                        .field("ts", Types.SQL_TIMESTAMP())*/
                        .rowtime(new Rowtime()
                                .timestampsFromField("_zpsbda")
                                .watermarksPeriodicBounded(5000)
                                                )
                                )
                                .withFormat(new Json().deriveSchema())
                                .inAppendMode()
                                .registerTableSource("sourceTopic");
Table query = tableEnv.sqlQuery("SELECT sid, _zpsbd6 as ip, COUNT(*) as
total_hits, " +
                                "TUMBLE_START(ts, INTERVAL '1' MINUTE) as
tumbleStart, " +
                                "TUMBLE_END(ts, INTERVAL '1' MINUTE) as
tumbleEnd FROM sourceTopic " +
                                "WHERE r1='true' or r2='true' or r5='true'
or r10='true' and isBot='true'" +
                                "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),
sid,  _zpsbd6");

                TypeInformation<Row> schema = query.getSchema().toRowType();
                SerializationSchema<Row> serializationSchema = new
JsonRowSerializationSchema(schema);
                DataStream<Row> ds = tableEnv.toAppendStream(query,
Row.class);
                ds.print();

                List<HttpHost> httpHosts = new ArrayList<>();
                httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
ElasticsearchSink.Builder<Row> esSinkBuilder = new
ElasticsearchSink.Builder<>(
                                httpHosts,
                                new ElasticsearchSinkFunction<Row>() {

                                        public IndexRequest
createIndexRequest(Row row) {
                                                byte[] document =
serializationSchema.serialize(row);

                                                return new
IndexRequest("prod", "logs")

.source(document, XContentType.JSON);

                                        }

                                        @Override
                                        public void process(Row r,
RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
                                                System.out.println(r);

requestIndexer.add(createIndexRequest(r));
                                        }
                                }
                );

                esSinkBuilder.setBulkFlushMaxActions(1);

                ds.addSink(esSinkBuilder.build());


Thanks,
~Ramya.
Reply | Threaded
Open this post in threaded view
|

Re: Data not getting passed between operators

Dominik Wosiński
Hey,
Could You please format Your snippet? It is very hard to understand what is
going on in there.
Best Regards,
Dom.

śr., 16 sty 2019 o 13:05 Ramya Ramamurthy <[hidden email]> napisał(a):

> Hi
>
> I have a Flink 1.7 with Kafka 0.11 and ES 6.5 setup.
>
> I can see the Flink Kafka Consumer consuming messages, but these are not
> passed on to the next level, that is the elasticsearch sink. Unable to find
> any logs relevant to this.
>
> Logs about my kafka consumers
>
> 2019-01-16 17:28:05,860 DEBUG
>
> org.apache.flink.kafka011.shaded.org.apache.kafka.clients.consumer.KafkaConsumer
> - Committing offsets:
> {legitimatestream-16=OffsetAndMetadata{offset=203252637, metadata=''},
> legitimatestream-18=OffsetAndMetadata{offset=203236011, metadata=''},
> legitimatestream-20=OffsetAndMetadata{offset=203237190, metadata=''},
> legitimatestream-22=OffsetAndMetadata{offset=203273504, metadata=''},
> legitimatestream-8=OffsetAndMetadata{offset=203251672, metadata=''},
> legitimatestream-10=OffsetAndMetadata{offset=203235871, metadata=''},
> legitimatestream-12=OffsetAndMetadata{offset=203242970, metadata=''},
> legitimatestream-14=OffsetAndMetadata{offset=203269129, metadata=''},
> legitimatestream-0=OffsetAndMetadata{offset=203247420, metadata=''},
> legitimatestream--9=OffsetAndMetadata{offset=203226435, metadata=''},
> legitimatestream-11=OffsetAndMetadata{offset=203259207, metadata=''},
> legitimatestream-13=OffsetAndMetadata{offset=203262566, metadata=''},
> legitimatestream-1=OffsetAndMetadata{offset=203230950, metadata=''},
> legitimatestream-3=OffsetAndMetadata{offset=203260289, metadata=''},
> legitimatestream-5=OffsetAndMetadata{offset=203285827, metadata=''},
> legitimatestream-24=OffsetAndMetadata{offset=203240761, metadata=''},
> legitimatestream-26=OffsetAndMetadata{offset=203254649, metadata=''},
> legitimatestream-28=OffsetAndMetadata{offset=203265863, metadata=''}}
>
> I am unable to see any logs regarding passing this data to the next level,
> which is the Table Query, followed by sink.
>
> Can anyone help me out with why this might happen, or am i missing
> something ??
>
> Here is the snippet of my code:
> StreamTableEnvironment tableEnv =
> TableEnvironment.getTableEnvironment(env);
>
>                 tableEnv.connect(new Kafka()
>                                 .version("0.11")
>                                 .topic(params.getRequired("read-topic"))
>                                 .property("auto.offset.reset", "latest")
>                 .property("group.id", params.getRequired("group.id"))
>                                 .property("bootstrap.servers",
> params.getRequired("bootstrap.servers")))
>                                 .withSchema(new Schema()
>                         .field("sid", Types.STRING())
>                         .field("_zpsbd6", Types.STRING())
>                         .field("r1", Types.STRING())
>                         .field("r2", Types.STRING())
>                         .field("r5", Types.STRING())
>                         .field("r10", Types.STRING())
>                         .field("isBot", Types.BOOLEAN())
>                         .field("botcode", Types.STRING())
>                         .field("ts", Types.SQL_TIMESTAMP())
>                         /*.field("sensor", Types.STRING())
>                         .field("temp", Types.LONG())
>                         .field("ts", Types.SQL_TIMESTAMP())*/
>                         .rowtime(new Rowtime()
>                                 .timestampsFromField("_zpsbda")
>                                 .watermarksPeriodicBounded(5000)
>                                                 )
>                                 )
>                                 .withFormat(new Json().deriveSchema())
>                                 .inAppendMode()
>                                 .registerTableSource("sourceTopic");
> Table query = tableEnv.sqlQuery("SELECT sid, _zpsbd6 as ip, COUNT(*) as
> total_hits, " +
>                                 "TUMBLE_START(ts, INTERVAL '1' MINUTE) as
> tumbleStart, " +
>                                 "TUMBLE_END(ts, INTERVAL '1' MINUTE) as
> tumbleEnd FROM sourceTopic " +
>                                 "WHERE r1='true' or r2='true' or r5='true'
> or r10='true' and isBot='true'" +
>                                 "GROUP BY TUMBLE(ts, INTERVAL '1' MINUTE),
> sid,  _zpsbd6");
>
>                 TypeInformation<Row> schema =
> query.getSchema().toRowType();
>                 SerializationSchema<Row> serializationSchema = new
> JsonRowSerializationSchema(schema);
>                 DataStream<Row> ds = tableEnv.toAppendStream(query,
> Row.class);
>                 ds.print();
>
>                 List<HttpHost> httpHosts = new ArrayList<>();
>                 httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
> ElasticsearchSink.Builder<Row> esSinkBuilder = new
> ElasticsearchSink.Builder<>(
>                                 httpHosts,
>                                 new ElasticsearchSinkFunction<Row>() {
>
>                                         public IndexRequest
> createIndexRequest(Row row) {
>                                                 byte[] document =
> serializationSchema.serialize(row);
>
>                                                 return new
> IndexRequest("prod", "logs")
>
> .source(document, XContentType.JSON);
>
>                                         }
>
>                                         @Override
>                                         public void process(Row r,
> RuntimeContext runtimeContext, RequestIndexer requestIndexer) {
>                                                 System.out.println(r);
>
> requestIndexer.add(createIndexRequest(r));
>                                         }
>                                 }
>                 );
>
>                 esSinkBuilder.setBulkFlushMaxActions(1);
>
>                 ds.addSink(esSinkBuilder.build());
>
>
> Thanks,
> ~Ramya.
>