Hello everyone,
I am currently playing with streams which timestamp is defined by EventTime. I currently have the following code: final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); DataStream<String> input = env.readTextFile("file:///var/log/syslog"); input.assignTimestampsAndWatermarks(new AssignTimestampFromLogEvent()); input.timeWindowAll(Time.minutes(5)).apply(new AllWindowFunction<Iterable<String>, String, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<String> values, Collector<String> out) throws Exception { for(String t:values) { out.collect(t); } } }).print(); (...) public static final class AssignTimestampFromLogEvent extends AscendingTimestampExtractor<String> { @Override public long extractAscendingTimestamp(String element, long previousElementTimestamp){ String date = element.substring(0,15); SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss"); Date ddate = null; try { ddate = sdf.parse(date); } catch (ParseException e) { e.printStackTrace(); } return ddate.getTime(); } } What I expect it to do is to read the syslog, assign timestamp and do 5 minutes windows *based on the syslog event time*, as I've configured the stream to do it. It however does not do that, and does the windows based on processing time. What am I missing here? Best regards, -- *Nam-Luc TRAN* R&D Manager EURA NOVA (M) +32 498 37 36 23 *euranova.eu <http://euranova.eu>* |
Hi,
I had a similar issue recently. Instead of input.assignTimestampsAndWatermarks you have to do: input = input.assignTimestampsAndWatermarks On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <[hidden email]> wrote: > Hello everyone, > > I am currently playing with streams which timestamp is defined by > EventTime. I currently have the following code: > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > DataStream<String> input = > env.readTextFile("file:///var/log/syslog"); > input.assignTimestampsAndWatermarks(new > AssignTimestampFromLogEvent()); > > input.timeWindowAll(Time.minutes(5)).apply(new > AllWindowFunction<Iterable<String>, String, TimeWindow>() { > @Override > public void apply(TimeWindow window, Iterable<String> values, > Collector<String> out) throws Exception { > for(String t:values) { > out.collect(t); > } > } > }).print(); > > (...) > > public static final class AssignTimestampFromLogEvent extends > AscendingTimestampExtractor<String> { > @Override > public long extractAscendingTimestamp(String element, long > previousElementTimestamp){ > String date = element.substring(0,15); > SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss"); > Date ddate = null; > try { > ddate = sdf.parse(date); > } catch (ParseException e) { > e.printStackTrace(); > } > return ddate.getTime(); > } > } > > > What I expect it to do is to read the syslog, assign timestamp and do > 5 minutes windows *based on the syslog event time*, as I've configured > the stream to do it. It however does not do that, and does the windows > based on processing time. > > What am I missing here? > > Best regards, > > -- > > *Nam-Luc TRAN* > > R&D Manager > > EURA NOVA > > (M) +32 498 37 36 23 > > *euranova.eu <http://euranova.eu>* > |
Great, that did it, thanks Robert ;)
While I'm at it: Sometimes results are correctly returned, sometimes, the output of the job (print or writeAsText) is plain empty, like the job finished too quickly before the results are written. One way of "forcing" results to happen is to insert a "delay" in the source stream, as with a FlatMap: @Override public void flatMap(String value, Collector<String> out) throws Exception { Thread.sleep(1); out.collect(value.toLowerCase()); } Am I missing anything here? Best regards, 2016-02-25 20:05 GMT+01:00 Robert Metzger <[hidden email]>: > Hi, > > I had a similar issue recently. > Instead of > input.assignTimestampsAndWatermarks > > you have to do: > > input = input.assignTimestampsAndWatermarks > > On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <[hidden email]> > wrote: > > > Hello everyone, > > > > I am currently playing with streams which timestamp is defined by > > EventTime. I currently have the following code: > > > > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000); > > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > > > > DataStream<String> input = > > env.readTextFile("file:///var/log/syslog"); > > input.assignTimestampsAndWatermarks(new > > AssignTimestampFromLogEvent()); > > > > input.timeWindowAll(Time.minutes(5)).apply(new > > AllWindowFunction<Iterable<String>, String, TimeWindow>() { > > @Override > > public void apply(TimeWindow window, Iterable<String> values, > > Collector<String> out) throws Exception { > > for(String t:values) { > > out.collect(t); > > } > > } > > }).print(); > > > > (...) > > > > public static final class AssignTimestampFromLogEvent extends > > AscendingTimestampExtractor<String> { > > @Override > > public long extractAscendingTimestamp(String element, long > > previousElementTimestamp){ > > String date = element.substring(0,15); > > SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss"); > > Date ddate = null; > > try { > > ddate = sdf.parse(date); > > } catch (ParseException e) { > > e.printStackTrace(); > > } > > return ddate.getTime(); > > } > > } > > > > > > What I expect it to do is to read the syslog, assign timestamp and do > > 5 minutes windows *based on the syslog event time*, as I've configured > > the stream to do it. It however does not do that, and does the windows > > based on processing time. > > > > What am I missing here? > > > > Best regards, > > > > -- > > > > *Nam-Luc TRAN* > > > > R&D Manager > > > > EURA NOVA > > > > (M) +32 498 37 36 23 > > > > *euranova.eu <http://euranova.eu>* > > > -- *Nam-Luc TRAN* R&D Manager EURA NOVA (M) +32 498 37 36 23 *euranova.eu <http://euranova.eu>* |
Hi,
I think the problem is that the source finished before the extractor has the chance to emit even a single watermark. This means that the topology will shut down and the window operator does not emit in-flight windows upon shutdown. Cheers, Aljoscha > On 26 Feb 2016, at 11:40, Nam-Luc Tran <[hidden email]> wrote: > > Great, that did it, thanks Robert ;) > > While I'm at it: > Sometimes results are correctly returned, sometimes, the output of the job > (print or writeAsText) is plain empty, like the job finished too quickly > before the results are written. One way of "forcing" results to happen is > to insert a "delay" in the source stream, as with a FlatMap: > > @Override > public void flatMap(String value, Collector<String> out) > throws Exception { > Thread.sleep(1); > out.collect(value.toLowerCase()); > } > > Am I missing anything here? > > Best regards, > > > 2016-02-25 20:05 GMT+01:00 Robert Metzger <[hidden email]>: > >> Hi, >> >> I had a similar issue recently. >> Instead of >> input.assignTimestampsAndWatermarks >> >> you have to do: >> >> input = input.assignTimestampsAndWatermarks >> >> On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <[hidden email]> >> wrote: >> >>> Hello everyone, >>> >>> I am currently playing with streams which timestamp is defined by >>> EventTime. I currently have the following code: >>> >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> >>> env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000); >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); >>> >>> DataStream<String> input = >>> env.readTextFile("file:///var/log/syslog"); >>> input.assignTimestampsAndWatermarks(new >>> AssignTimestampFromLogEvent()); >>> >>> input.timeWindowAll(Time.minutes(5)).apply(new >>> AllWindowFunction<Iterable<String>, String, TimeWindow>() { >>> @Override >>> public void apply(TimeWindow window, Iterable<String> values, >>> Collector<String> out) throws Exception { >>> for(String t:values) { >>> out.collect(t); >>> } >>> } >>> }).print(); >>> >>> (...) >>> >>> public static final class AssignTimestampFromLogEvent extends >>> AscendingTimestampExtractor<String> { >>> @Override >>> public long extractAscendingTimestamp(String element, long >>> previousElementTimestamp){ >>> String date = element.substring(0,15); >>> SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss"); >>> Date ddate = null; >>> try { >>> ddate = sdf.parse(date); >>> } catch (ParseException e) { >>> e.printStackTrace(); >>> } >>> return ddate.getTime(); >>> } >>> } >>> >>> >>> What I expect it to do is to read the syslog, assign timestamp and do >>> 5 minutes windows *based on the syslog event time*, as I've configured >>> the stream to do it. It however does not do that, and does the windows >>> based on processing time. >>> >>> What am I missing here? >>> >>> Best regards, >>> >>> -- >>> >>> *Nam-Luc TRAN* >>> >>> R&D Manager >>> >>> EURA NOVA >>> >>> (M) +32 498 37 36 23 >>> >>> *euranova.eu <http://euranova.eu>* >>> >> > > > > -- > > *Nam-Luc TRAN* > > R&D Manager > > EURA NOVA > > (M) +32 498 37 36 23 > > *euranova.eu <http://euranova.eu>* |
Nice catch, actually.
I think we should let the timestamp extracting operator emit the current watermark prior to shutting down. On Fri, Feb 26, 2016 at 11:49 AM, Aljoscha Krettek <[hidden email]> wrote: > Hi, > I think the problem is that the source finished before the extractor has > the chance to emit even a single watermark. This means that the topology > will shut down and the window operator does not emit in-flight windows upon > shutdown. > > Cheers, > Aljoscha > > On 26 Feb 2016, at 11:40, Nam-Luc Tran <[hidden email]> wrote: > > > > Great, that did it, thanks Robert ;) > > > > While I'm at it: > > Sometimes results are correctly returned, sometimes, the output of the > job > > (print or writeAsText) is plain empty, like the job finished too quickly > > before the results are written. One way of "forcing" results to happen is > > to insert a "delay" in the source stream, as with a FlatMap: > > > > @Override > > public void flatMap(String value, Collector<String> out) > > throws Exception { > > Thread.sleep(1); > > out.collect(value.toLowerCase()); > > } > > > > Am I missing anything here? > > > > Best regards, > > > > > > 2016-02-25 20:05 GMT+01:00 Robert Metzger <[hidden email]>: > > > >> Hi, > >> > >> I had a similar issue recently. > >> Instead of > >> input.assignTimestampsAndWatermarks > >> > >> you have to do: > >> > >> input = input.assignTimestampsAndWatermarks > >> > >> On Thu, Feb 25, 2016 at 6:14 PM, Nam-Luc Tran <[hidden email]> > >> wrote: > >> > >>> Hello everyone, > >>> > >>> I am currently playing with streams which timestamp is defined by > >>> EventTime. I currently have the following code: > >>> > >>> final StreamExecutionEnvironment env = > >>> StreamExecutionEnvironment.getExecutionEnvironment(); > >>> > >>> env.getConfig().enableTimestamps();//.setAutoWatermarkInterval(10000); > >>> env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > >>> > >>> DataStream<String> input = > >>> env.readTextFile("file:///var/log/syslog"); > >>> input.assignTimestampsAndWatermarks(new > >>> AssignTimestampFromLogEvent()); > >>> > >>> input.timeWindowAll(Time.minutes(5)).apply(new > >>> AllWindowFunction<Iterable<String>, String, TimeWindow>() { > >>> @Override > >>> public void apply(TimeWindow window, Iterable<String> values, > >>> Collector<String> out) throws Exception { > >>> for(String t:values) { > >>> out.collect(t); > >>> } > >>> } > >>> }).print(); > >>> > >>> (...) > >>> > >>> public static final class AssignTimestampFromLogEvent extends > >>> AscendingTimestampExtractor<String> { > >>> @Override > >>> public long extractAscendingTimestamp(String element, long > >>> previousElementTimestamp){ > >>> String date = element.substring(0,15); > >>> SimpleDateFormat sdf = new SimpleDateFormat("MMM dd HH:mm:ss"); > >>> Date ddate = null; > >>> try { > >>> ddate = sdf.parse(date); > >>> } catch (ParseException e) { > >>> e.printStackTrace(); > >>> } > >>> return ddate.getTime(); > >>> } > >>> } > >>> > >>> > >>> What I expect it to do is to read the syslog, assign timestamp and do > >>> 5 minutes windows *based on the syslog event time*, as I've configured > >>> the stream to do it. It however does not do that, and does the windows > >>> based on processing time. > >>> > >>> What am I missing here? > >>> > >>> Best regards, > >>> > >>> -- > >>> > >>> *Nam-Luc TRAN* > >>> > >>> R&D Manager > >>> > >>> EURA NOVA > >>> > >>> (M) +32 498 37 36 23 > >>> > >>> *euranova.eu <http://euranova.eu>* > >>> > >> > > > > > > > > -- > > > > *Nam-Luc TRAN* > > > > R&D Manager > > > > EURA NOVA > > > > (M) +32 498 37 36 23 > > > > *euranova.eu <http://euranova.eu>* > > |
Free forum by Nabble | Edit this page |