Hi,
I have a query with regard to Late arriving records. We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11. In my sink operators, which converts this table to a stream which is being pushed to Elastic Search, I am able to see this metric " *numLateRecordsDropped*". My Kafka consumers doesn't seem to have any lag and the events are processed properly. To be able to take these events to a side outputs doesn't seem to be possible with tables. Below is the snippet: tableEnv.connect(new Kafka() /* setting of all kafka properties */ .startFromLatest()) .withSchema(new Schema() .field("sid", Types.STRING()) .field("_zpsbd6", Types.STRING()) .field("r1", Types.STRING()) .field("r2", Types.STRING()) .field("r5", Types.STRING()) .field("r10", Types.STRING()) .field("isBot", Types.BOOLEAN()) .field("botcode", Types.STRING()) .field("ts", Types.SQL_TIMESTAMP()) .rowtime(new Rowtime() .timestampsFromField("recvdTime") .watermarksPeriodicBounded(10000) ) ) .withFormat(new Json().deriveSchema()) .inAppendMode() .registerTableSource("sourceTopic"); String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, " + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as tumbleStart, " + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM sourceTopic " + "WHERE r1='true' or r2='true' or r5='true' or r10='true' and isBot='true' " + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid, _zpsbd6"; Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is showing the lateRecordsDropped, while executing the group by operation. Is there a way to get the sideOutput of this to be able to debug better ?? Thanks, ~Ramya. |
Hi Ramya,
This would be a great feature, but unfortunately is not support (yet) by Flink SQL. Currently, all late records are dropped. A workaround is to ingest the stream as a DataStream, have a custom operator that routes all late records to a side output, and registering the DataStream without late records as a table on which the SQL query is evaluated. This requires quite a bit of boilerplate code but could be hidden in a util class. Best, Fabian Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy < [hidden email]>: > Hi, > > I have a query with regard to Late arriving records. > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11. > In my sink operators, which converts this table to a stream which is being > pushed to Elastic Search, I am able to see this metric " > *numLateRecordsDropped*". > > My Kafka consumers doesn't seem to have any lag and the events are > processed properly. To be able to take these events to a side outputs > doesn't seem to be possible with tables. Below is the snippet: > > tableEnv.connect(new Kafka() > /* setting of all kafka properties */ > .startFromLatest()) > .withSchema(new Schema() > .field("sid", Types.STRING()) > .field("_zpsbd6", Types.STRING()) > .field("r1", Types.STRING()) > .field("r2", Types.STRING()) > .field("r5", Types.STRING()) > .field("r10", Types.STRING()) > .field("isBot", Types.BOOLEAN()) > .field("botcode", Types.STRING()) > .field("ts", Types.SQL_TIMESTAMP()) > .rowtime(new Rowtime() > .timestampsFromField("recvdTime") > .watermarksPeriodicBounded(10000) > ) > ) > .withFormat(new Json().deriveSchema()) > .inAppendMode() > .registerTableSource("sourceTopic"); > > String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, " > + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as tumbleStart, " > + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM > sourceTopic " > + "WHERE r1='true' or r2='true' or r5='true' or r10='true' > and isBot='true' " > + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid, _zpsbd6"; > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is > showing the lateRecordsDropped, while executing the group by operation. > > Is there a way to get the sideOutput of this to be able to debug better ?? > > Thanks, > ~Ramya. > |
Hi,
We were trying to collect the sideOutput. But failed to understand as to how to convert this windowed stream to a datastream. final OutputTag<Tuple6<String, String, String, String, String, Timestamp>> lateOutputTag = new OutputTag<Tuple6<String, String, String, String, String, Timestamp>>("late-data"){}; withTime.keyBy(0, 2) .window(TumblingEventTimeWindows.of(Time.minutes(5))) .allowedLateness(Time.minutes(1)) .sideOutputLateData(lateOutputTag); I would now have a windowed stream with records coming in late, tagged as lateOutputTag. How to convert the packets which are not late , back to a datastream. Do we need to use the .apply function to collect this data ... quite unsure of this. Appreciate your help. Best Regards, On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <[hidden email]> wrote: > Hi Ramya, > > This would be a great feature, but unfortunately is not support (yet) by > Flink SQL. > Currently, all late records are dropped. > > A workaround is to ingest the stream as a DataStream, have a custom > operator that routes all late records to a side output, and registering the > DataStream without late records as a table on which the SQL query is > evaluated. > This requires quite a bit of boilerplate code but could be hidden in a util > class. > > Best, Fabian > > Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy < > [hidden email]>: > > > Hi, > > > > I have a query with regard to Late arriving records. > > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11. > > In my sink operators, which converts this table to a stream which is > being > > pushed to Elastic Search, I am able to see this metric " > > *numLateRecordsDropped*". > > > > My Kafka consumers doesn't seem to have any lag and the events are > > processed properly. To be able to take these events to a side outputs > > doesn't seem to be possible with tables. Below is the snippet: > > > > tableEnv.connect(new Kafka() > > /* setting of all kafka properties */ > > .startFromLatest()) > > .withSchema(new Schema() > > .field("sid", Types.STRING()) > > .field("_zpsbd6", Types.STRING()) > > .field("r1", Types.STRING()) > > .field("r2", Types.STRING()) > > .field("r5", Types.STRING()) > > .field("r10", Types.STRING()) > > .field("isBot", Types.BOOLEAN()) > > .field("botcode", Types.STRING()) > > .field("ts", Types.SQL_TIMESTAMP()) > > .rowtime(new Rowtime() > > .timestampsFromField("recvdTime") > > .watermarksPeriodicBounded(10000) > > ) > > ) > > .withFormat(new Json().deriveSchema()) > > .inAppendMode() > > .registerTableSource("sourceTopic"); > > > > String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, " > > + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as tumbleStart, " > > + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM > > sourceTopic " > > + "WHERE r1='true' or r2='true' or r5='true' or r10='true' > > and isBot='true' " > > + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid, > _zpsbd6"; > > > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is > > showing the lateRecordsDropped, while executing the group by operation. > > > > Is there a way to get the sideOutput of this to be able to debug better > ?? > > > > Thanks, > > ~Ramya. > > > |
Hi Ramya,
This works by calling getSideOutput() on the main output of the window function. The main output is collected by applying a function on the window. DataStream<Input> input = ... OutputTag<Input> lateTag = ... DataStream<Result> mainResult = input .keyBy(...) .window(...) .sideOutputLateData(lateTag) .apply(yourFunction); DataStream<Input> lateRecords = mainResult.getSideOutput(lateTag); Best, Fabian Am Mo., 28. Jan. 2019 um 11:09 Uhr schrieb Ramya Ramamurthy < [hidden email]>: > Hi, > > We were trying to collect the sideOutput. > But failed to understand as to how to convert this windowed stream to a > datastream. > > final OutputTag<Tuple6<String, String, String, String, String, Timestamp>> > lateOutputTag = new OutputTag<Tuple6<String, String, String, String, > String, Timestamp>>("late-data"){}; > withTime.keyBy(0, 2) > .window(TumblingEventTimeWindows.of(Time.minutes(5))) > .allowedLateness(Time.minutes(1)) > .sideOutputLateData(lateOutputTag); > > I would now have a windowed stream with records coming in late, tagged as > lateOutputTag. How to convert the packets which are not late , back to a > datastream. Do we need to use the .apply function to collect this data ... > quite unsure of this. Appreciate your help. > > Best Regards, > > > > On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <[hidden email]> wrote: > > > Hi Ramya, > > > > This would be a great feature, but unfortunately is not support (yet) by > > Flink SQL. > > Currently, all late records are dropped. > > > > A workaround is to ingest the stream as a DataStream, have a custom > > operator that routes all late records to a side output, and registering > the > > DataStream without late records as a table on which the SQL query is > > evaluated. > > This requires quite a bit of boilerplate code but could be hidden in a > util > > class. > > > > Best, Fabian > > > > Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy < > > [hidden email]>: > > > > > Hi, > > > > > > I have a query with regard to Late arriving records. > > > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11. > > > In my sink operators, which converts this table to a stream which is > > being > > > pushed to Elastic Search, I am able to see this metric " > > > *numLateRecordsDropped*". > > > > > > My Kafka consumers doesn't seem to have any lag and the events are > > > processed properly. To be able to take these events to a side outputs > > > doesn't seem to be possible with tables. Below is the snippet: > > > > > > tableEnv.connect(new Kafka() > > > /* setting of all kafka properties */ > > > .startFromLatest()) > > > .withSchema(new Schema() > > > .field("sid", Types.STRING()) > > > .field("_zpsbd6", Types.STRING()) > > > .field("r1", Types.STRING()) > > > .field("r2", Types.STRING()) > > > .field("r5", Types.STRING()) > > > .field("r10", Types.STRING()) > > > .field("isBot", Types.BOOLEAN()) > > > .field("botcode", Types.STRING()) > > > .field("ts", Types.SQL_TIMESTAMP()) > > > .rowtime(new Rowtime() > > > .timestampsFromField("recvdTime") > > > .watermarksPeriodicBounded(10000) > > > ) > > > ) > > > .withFormat(new Json().deriveSchema()) > > > .inAppendMode() > > > .registerTableSource("sourceTopic"); > > > > > > String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as > total_hits, " > > > + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as > tumbleStart, " > > > + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM > > > sourceTopic " > > > + "WHERE r1='true' or r2='true' or r5='true' or > r10='true' > > > and isBot='true' " > > > + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid, > > _zpsbd6"; > > > > > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is > > > showing the lateRecordsDropped, while executing the group by operation. > > > > > > Is there a way to get the sideOutput of this to be able to debug > better > > ?? > > > > > > Thanks, > > > ~Ramya. > > > > > > |
Hi Fabian,
I do not have anything to do "yourfunction". .apply windowFunction is legacy is what the documentation says. But I am at a loss to understand which of the reduce, aggregate, Fold, Apply must i use, as i hardly have any operations to perform but to return the stream with no late data back to me [So that I will construct a Flink table with this data and do my processing there]. ~Ramya. On Tue, Jan 29, 2019 at 3:52 PM Fabian Hueske <[hidden email]> wrote: > Hi Ramya, > > This works by calling getSideOutput() on the main output of the window > function. > The main output is collected by applying a function on the window. > > DataStream<Input> input = ... > OutputTag<Input> lateTag = ... > > DataStream<Result> mainResult = input > .keyBy(...) > .window(...) > .sideOutputLateData(lateTag) > .apply(yourFunction); > > DataStream<Input> lateRecords = mainResult.getSideOutput(lateTag); > > Best, Fabian > > Am Mo., 28. Jan. 2019 um 11:09 Uhr schrieb Ramya Ramamurthy < > [hidden email]>: > > > Hi, > > > > We were trying to collect the sideOutput. > > But failed to understand as to how to convert this windowed stream to a > > datastream. > > > > final OutputTag<Tuple6<String, String, String, String, String, > Timestamp>> > > lateOutputTag = new OutputTag<Tuple6<String, String, String, String, > > String, Timestamp>>("late-data"){}; > > withTime.keyBy(0, 2) > > .window(TumblingEventTimeWindows.of(Time.minutes(5))) > > .allowedLateness(Time.minutes(1)) > > .sideOutputLateData(lateOutputTag); > > > > I would now have a windowed stream with records coming in late, tagged as > > lateOutputTag. How to convert the packets which are not late , back to a > > datastream. Do we need to use the .apply function to collect this data > ... > > quite unsure of this. Appreciate your help. > > > > Best Regards, > > > > > > > > On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <[hidden email]> > wrote: > > > > > Hi Ramya, > > > > > > This would be a great feature, but unfortunately is not support (yet) > by > > > Flink SQL. > > > Currently, all late records are dropped. > > > > > > A workaround is to ingest the stream as a DataStream, have a custom > > > operator that routes all late records to a side output, and registering > > the > > > DataStream without late records as a table on which the SQL query is > > > evaluated. > > > This requires quite a bit of boilerplate code but could be hidden in a > > util > > > class. > > > > > > Best, Fabian > > > > > > Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy < > > > [hidden email]>: > > > > > > > Hi, > > > > > > > > I have a query with regard to Late arriving records. > > > > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11. > > > > In my sink operators, which converts this table to a stream which is > > > being > > > > pushed to Elastic Search, I am able to see this metric " > > > > *numLateRecordsDropped*". > > > > > > > > My Kafka consumers doesn't seem to have any lag and the events are > > > > processed properly. To be able to take these events to a side outputs > > > > doesn't seem to be possible with tables. Below is the snippet: > > > > > > > > tableEnv.connect(new Kafka() > > > > /* setting of all kafka properties */ > > > > .startFromLatest()) > > > > .withSchema(new Schema() > > > > .field("sid", Types.STRING()) > > > > .field("_zpsbd6", Types.STRING()) > > > > .field("r1", Types.STRING()) > > > > .field("r2", Types.STRING()) > > > > .field("r5", Types.STRING()) > > > > .field("r10", Types.STRING()) > > > > .field("isBot", Types.BOOLEAN()) > > > > .field("botcode", Types.STRING()) > > > > .field("ts", Types.SQL_TIMESTAMP()) > > > > .rowtime(new Rowtime() > > > > .timestampsFromField("recvdTime") > > > > .watermarksPeriodicBounded(10000) > > > > ) > > > > ) > > > > .withFormat(new Json().deriveSchema()) > > > > .inAppendMode() > > > > .registerTableSource("sourceTopic"); > > > > > > > > String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as > > total_hits, " > > > > + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as > > tumbleStart, " > > > > + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd > FROM > > > > sourceTopic " > > > > + "WHERE r1='true' or r2='true' or r5='true' or > > r10='true' > > > > and isBot='true' " > > > > + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid, > > > _zpsbd6"; > > > > > > > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric > is > > > > showing the lateRecordsDropped, while executing the group by > operation. > > > > > > > > Is there a way to get the sideOutput of this to be able to debug > > better > > > ?? > > > > > > > > Thanks, > > > > ~Ramya. > > > > > > > > > > |
Hi Ramya,
If you don't want to apply any logic but just filter late records, you should not use a window because it needs to shuffle and group records into windows. Instead, you can use a non-keyed ProcessFunction and compare the timestamp of the record (context.timestamp()) with the current watermark (context.getTimerService().currentWatermark()) and emit all records that are late to a side output. This will avoid the shuffle and reduce processing latency. Best, Fabian Am Di., 29. Jan. 2019 um 15:02 Uhr schrieb Ramya Ramamurthy < [hidden email]>: > Hi Fabian, > > I do not have anything to do "yourfunction". > .apply windowFunction is legacy is what the documentation says. But I am at > a loss to understand which of the reduce, aggregate, Fold, Apply must i > use, as i hardly have any operations to perform but to return the stream > with no late data back to me [So that I will construct a Flink table with > this data and do my processing there]. > > ~Ramya. > > > > On Tue, Jan 29, 2019 at 3:52 PM Fabian Hueske <[hidden email]> wrote: > > > Hi Ramya, > > > > This works by calling getSideOutput() on the main output of the window > > function. > > The main output is collected by applying a function on the window. > > > > DataStream<Input> input = ... > > OutputTag<Input> lateTag = ... > > > > DataStream<Result> mainResult = input > > .keyBy(...) > > .window(...) > > .sideOutputLateData(lateTag) > > .apply(yourFunction); > > > > DataStream<Input> lateRecords = mainResult.getSideOutput(lateTag); > > > > Best, Fabian > > > > Am Mo., 28. Jan. 2019 um 11:09 Uhr schrieb Ramya Ramamurthy < > > [hidden email]>: > > > > > Hi, > > > > > > We were trying to collect the sideOutput. > > > But failed to understand as to how to convert this windowed stream to a > > > datastream. > > > > > > final OutputTag<Tuple6<String, String, String, String, String, > > Timestamp>> > > > lateOutputTag = new OutputTag<Tuple6<String, String, String, String, > > > String, Timestamp>>("late-data"){}; > > > withTime.keyBy(0, 2) > > > .window(TumblingEventTimeWindows.of(Time.minutes(5))) > > > .allowedLateness(Time.minutes(1)) > > > .sideOutputLateData(lateOutputTag); > > > > > > I would now have a windowed stream with records coming in late, tagged > as > > > lateOutputTag. How to convert the packets which are not late , back to > a > > > datastream. Do we need to use the .apply function to collect this data > > ... > > > quite unsure of this. Appreciate your help. > > > > > > Best Regards, > > > > > > > > > > > > On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <[hidden email]> > > wrote: > > > > > > > Hi Ramya, > > > > > > > > This would be a great feature, but unfortunately is not support (yet) > > by > > > > Flink SQL. > > > > Currently, all late records are dropped. > > > > > > > > A workaround is to ingest the stream as a DataStream, have a custom > > > > operator that routes all late records to a side output, and > registering > > > the > > > > DataStream without late records as a table on which the SQL query is > > > > evaluated. > > > > This requires quite a bit of boilerplate code but could be hidden in > a > > > util > > > > class. > > > > > > > > Best, Fabian > > > > > > > > Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy < > > > > [hidden email]>: > > > > > > > > > Hi, > > > > > > > > > > I have a query with regard to Late arriving records. > > > > > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11. > > > > > In my sink operators, which converts this table to a stream which > is > > > > being > > > > > pushed to Elastic Search, I am able to see this metric " > > > > > *numLateRecordsDropped*". > > > > > > > > > > My Kafka consumers doesn't seem to have any lag and the events are > > > > > processed properly. To be able to take these events to a side > outputs > > > > > doesn't seem to be possible with tables. Below is the snippet: > > > > > > > > > > tableEnv.connect(new Kafka() > > > > > /* setting of all kafka properties */ > > > > > .startFromLatest()) > > > > > .withSchema(new Schema() > > > > > .field("sid", Types.STRING()) > > > > > .field("_zpsbd6", Types.STRING()) > > > > > .field("r1", Types.STRING()) > > > > > .field("r2", Types.STRING()) > > > > > .field("r5", Types.STRING()) > > > > > .field("r10", Types.STRING()) > > > > > .field("isBot", Types.BOOLEAN()) > > > > > .field("botcode", Types.STRING()) > > > > > .field("ts", Types.SQL_TIMESTAMP()) > > > > > .rowtime(new Rowtime() > > > > > .timestampsFromField("recvdTime") > > > > > .watermarksPeriodicBounded(10000) > > > > > ) > > > > > ) > > > > > .withFormat(new Json().deriveSchema()) > > > > > .inAppendMode() > > > > > .registerTableSource("sourceTopic"); > > > > > > > > > > String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as > > > total_hits, " > > > > > + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as > > > tumbleStart, " > > > > > + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd > > FROM > > > > > sourceTopic " > > > > > + "WHERE r1='true' or r2='true' or r5='true' or > > > r10='true' > > > > > and isBot='true' " > > > > > + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid, > > > > _zpsbd6"; > > > > > > > > > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric > > is > > > > > showing the lateRecordsDropped, while executing the group by > > operation. > > > > > > > > > > Is there a way to get the sideOutput of this to be able to debug > > > better > > > > ?? > > > > > > > > > > Thanks, > > > > > ~Ramya. > > > > > > > > > > > > > > > |
Free forum by Nabble | Edit this page |