Side Outputs for late arriving records

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Side Outputs for late arriving records

Ramya Ramamurthy
Hi,

I have a query with regard to Late arriving records.
We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11.
In my sink operators, which converts this table to a stream which is being
pushed to Elastic Search, I am able to see this metric "
*numLateRecordsDropped*".

My Kafka consumers doesn't seem to have any lag and the events are
processed properly. To be able to take these events to a side outputs
doesn't seem to be possible with tables. Below is the snippet:

        tableEnv.connect(new Kafka()
          /* setting of all kafka properties */
               .startFromLatest())
               .withSchema(new Schema()
                       .field("sid", Types.STRING())
                       .field("_zpsbd6", Types.STRING())
                       .field("r1", Types.STRING())
                       .field("r2", Types.STRING())
                       .field("r5", Types.STRING())
                       .field("r10", Types.STRING())
                       .field("isBot", Types.BOOLEAN())
                       .field("botcode", Types.STRING())
                       .field("ts", Types.SQL_TIMESTAMP())
                       .rowtime(new Rowtime()
                               .timestampsFromField("recvdTime")
                               .watermarksPeriodicBounded(10000)
                       )
               )
               .withFormat(new Json().deriveSchema())
               .inAppendMode()
               .registerTableSource("sourceTopic");

       String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, "
               + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as tumbleStart, "
               + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM
sourceTopic "
               + "WHERE r1='true' or r2='true' or r5='true' or r10='true'
and isBot='true' "
               + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid,  _zpsbd6";

Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is
showing the lateRecordsDropped, while executing the group by operation.

Is there  a way to get the sideOutput of this to be able to debug better ??

Thanks,
~Ramya.
Reply | Threaded
Open this post in threaded view
|

Re: Side Outputs for late arriving records

Fabian Hueske-2
Hi Ramya,

This would be a great feature, but unfortunately is not support (yet) by
Flink SQL.
Currently, all late records are dropped.

A workaround is to ingest the stream as a DataStream, have a custom
operator that routes all late records to a side output, and registering the
DataStream without late records as a table on which the SQL query is
evaluated.
This requires quite a bit of boilerplate code but could be hidden in a util
class.

Best, Fabian

Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy <
[hidden email]>:

> Hi,
>
> I have a query with regard to Late arriving records.
> We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11.
> In my sink operators, which converts this table to a stream which is being
> pushed to Elastic Search, I am able to see this metric "
> *numLateRecordsDropped*".
>
> My Kafka consumers doesn't seem to have any lag and the events are
> processed properly. To be able to take these events to a side outputs
> doesn't seem to be possible with tables. Below is the snippet:
>
>         tableEnv.connect(new Kafka()
>           /* setting of all kafka properties */
>                .startFromLatest())
>                .withSchema(new Schema()
>                        .field("sid", Types.STRING())
>                        .field("_zpsbd6", Types.STRING())
>                        .field("r1", Types.STRING())
>                        .field("r2", Types.STRING())
>                        .field("r5", Types.STRING())
>                        .field("r10", Types.STRING())
>                        .field("isBot", Types.BOOLEAN())
>                        .field("botcode", Types.STRING())
>                        .field("ts", Types.SQL_TIMESTAMP())
>                        .rowtime(new Rowtime()
>                                .timestampsFromField("recvdTime")
>                                .watermarksPeriodicBounded(10000)
>                        )
>                )
>                .withFormat(new Json().deriveSchema())
>                .inAppendMode()
>                .registerTableSource("sourceTopic");
>
>        String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, "
>                + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as tumbleStart, "
>                + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM
> sourceTopic "
>                + "WHERE r1='true' or r2='true' or r5='true' or r10='true'
> and isBot='true' "
>                + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid,  _zpsbd6";
>
> Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is
> showing the lateRecordsDropped, while executing the group by operation.
>
> Is there  a way to get the sideOutput of this to be able to debug better ??
>
> Thanks,
> ~Ramya.
>
Reply | Threaded
Open this post in threaded view
|

Re: Side Outputs for late arriving records

Ramya Ramamurthy
Hi,

We were trying to collect the sideOutput.
But failed to understand as to how to convert this windowed stream to a
datastream.

final OutputTag<Tuple6<String, String, String, String, String, Timestamp>>
lateOutputTag = new OutputTag<Tuple6<String, String, String, String,
String, Timestamp>>("late-data"){};
withTime.keyBy(0, 2)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.allowedLateness(Time.minutes(1))
.sideOutputLateData(lateOutputTag);

I would now have a windowed stream with records coming in late, tagged as
lateOutputTag. How to convert the packets which are not late , back to a
datastream. Do we need to use the .apply function to collect this data ...
quite unsure of this. Appreciate your help.

Best Regards,



On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <[hidden email]> wrote:

> Hi Ramya,
>
> This would be a great feature, but unfortunately is not support (yet) by
> Flink SQL.
> Currently, all late records are dropped.
>
> A workaround is to ingest the stream as a DataStream, have a custom
> operator that routes all late records to a side output, and registering the
> DataStream without late records as a table on which the SQL query is
> evaluated.
> This requires quite a bit of boilerplate code but could be hidden in a util
> class.
>
> Best, Fabian
>
> Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy <
> [hidden email]>:
>
> > Hi,
> >
> > I have a query with regard to Late arriving records.
> > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11.
> > In my sink operators, which converts this table to a stream which is
> being
> > pushed to Elastic Search, I am able to see this metric "
> > *numLateRecordsDropped*".
> >
> > My Kafka consumers doesn't seem to have any lag and the events are
> > processed properly. To be able to take these events to a side outputs
> > doesn't seem to be possible with tables. Below is the snippet:
> >
> >         tableEnv.connect(new Kafka()
> >           /* setting of all kafka properties */
> >                .startFromLatest())
> >                .withSchema(new Schema()
> >                        .field("sid", Types.STRING())
> >                        .field("_zpsbd6", Types.STRING())
> >                        .field("r1", Types.STRING())
> >                        .field("r2", Types.STRING())
> >                        .field("r5", Types.STRING())
> >                        .field("r10", Types.STRING())
> >                        .field("isBot", Types.BOOLEAN())
> >                        .field("botcode", Types.STRING())
> >                        .field("ts", Types.SQL_TIMESTAMP())
> >                        .rowtime(new Rowtime()
> >                                .timestampsFromField("recvdTime")
> >                                .watermarksPeriodicBounded(10000)
> >                        )
> >                )
> >                .withFormat(new Json().deriveSchema())
> >                .inAppendMode()
> >                .registerTableSource("sourceTopic");
> >
> >        String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as total_hits, "
> >                + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as tumbleStart, "
> >                + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM
> > sourceTopic "
> >                + "WHERE r1='true' or r2='true' or r5='true' or r10='true'
> > and isBot='true' "
> >                + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid,
> _zpsbd6";
> >
> > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is
> > showing the lateRecordsDropped, while executing the group by operation.
> >
> > Is there  a way to get the sideOutput of this to be able to debug better
> ??
> >
> > Thanks,
> > ~Ramya.
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Side Outputs for late arriving records

Fabian Hueske-2
Hi Ramya,

This works by calling getSideOutput() on the main output of the window
function.
The main output is collected by applying a function on the window.

DataStream<Input> input = ...
OutputTag<Input> lateTag = ...

DataStream<Result> mainResult = input
  .keyBy(...)
  .window(...)
  .sideOutputLateData(lateTag)
  .apply(yourFunction);

DataStream<Input> lateRecords = mainResult.getSideOutput(lateTag);

Best, Fabian

Am Mo., 28. Jan. 2019 um 11:09 Uhr schrieb Ramya Ramamurthy <
[hidden email]>:

> Hi,
>
> We were trying to collect the sideOutput.
> But failed to understand as to how to convert this windowed stream to a
> datastream.
>
> final OutputTag<Tuple6<String, String, String, String, String, Timestamp>>
> lateOutputTag = new OutputTag<Tuple6<String, String, String, String,
> String, Timestamp>>("late-data"){};
> withTime.keyBy(0, 2)
> .window(TumblingEventTimeWindows.of(Time.minutes(5)))
> .allowedLateness(Time.minutes(1))
> .sideOutputLateData(lateOutputTag);
>
> I would now have a windowed stream with records coming in late, tagged as
> lateOutputTag. How to convert the packets which are not late , back to a
> datastream. Do we need to use the .apply function to collect this data ...
> quite unsure of this. Appreciate your help.
>
> Best Regards,
>
>
>
> On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <[hidden email]> wrote:
>
> > Hi Ramya,
> >
> > This would be a great feature, but unfortunately is not support (yet) by
> > Flink SQL.
> > Currently, all late records are dropped.
> >
> > A workaround is to ingest the stream as a DataStream, have a custom
> > operator that routes all late records to a side output, and registering
> the
> > DataStream without late records as a table on which the SQL query is
> > evaluated.
> > This requires quite a bit of boilerplate code but could be hidden in a
> util
> > class.
> >
> > Best, Fabian
> >
> > Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy <
> > [hidden email]>:
> >
> > > Hi,
> > >
> > > I have a query with regard to Late arriving records.
> > > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11.
> > > In my sink operators, which converts this table to a stream which is
> > being
> > > pushed to Elastic Search, I am able to see this metric "
> > > *numLateRecordsDropped*".
> > >
> > > My Kafka consumers doesn't seem to have any lag and the events are
> > > processed properly. To be able to take these events to a side outputs
> > > doesn't seem to be possible with tables. Below is the snippet:
> > >
> > >         tableEnv.connect(new Kafka()
> > >           /* setting of all kafka properties */
> > >                .startFromLatest())
> > >                .withSchema(new Schema()
> > >                        .field("sid", Types.STRING())
> > >                        .field("_zpsbd6", Types.STRING())
> > >                        .field("r1", Types.STRING())
> > >                        .field("r2", Types.STRING())
> > >                        .field("r5", Types.STRING())
> > >                        .field("r10", Types.STRING())
> > >                        .field("isBot", Types.BOOLEAN())
> > >                        .field("botcode", Types.STRING())
> > >                        .field("ts", Types.SQL_TIMESTAMP())
> > >                        .rowtime(new Rowtime()
> > >                                .timestampsFromField("recvdTime")
> > >                                .watermarksPeriodicBounded(10000)
> > >                        )
> > >                )
> > >                .withFormat(new Json().deriveSchema())
> > >                .inAppendMode()
> > >                .registerTableSource("sourceTopic");
> > >
> > >        String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as
> total_hits, "
> > >                + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as
> tumbleStart, "
> > >                + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd FROM
> > > sourceTopic "
> > >                + "WHERE r1='true' or r2='true' or r5='true' or
> r10='true'
> > > and isBot='true' "
> > >                + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid,
> > _zpsbd6";
> > >
> > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric is
> > > showing the lateRecordsDropped, while executing the group by operation.
> > >
> > > Is there  a way to get the sideOutput of this to be able to debug
> better
> > ??
> > >
> > > Thanks,
> > > ~Ramya.
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Side Outputs for late arriving records

Ramya Ramamurthy
Hi Fabian,

I do not have anything to do "yourfunction".
.apply windowFunction is legacy is what the documentation says. But I am at
a loss to understand which of the reduce, aggregate, Fold, Apply must i
use, as i hardly have any operations to perform but to return the stream
with no late data back to me [So that I will construct a Flink table with
this data and do my processing there].

~Ramya.



On Tue, Jan 29, 2019 at 3:52 PM Fabian Hueske <[hidden email]> wrote:

> Hi Ramya,
>
> This works by calling getSideOutput() on the main output of the window
> function.
> The main output is collected by applying a function on the window.
>
> DataStream<Input> input = ...
> OutputTag<Input> lateTag = ...
>
> DataStream<Result> mainResult = input
>   .keyBy(...)
>   .window(...)
>   .sideOutputLateData(lateTag)
>   .apply(yourFunction);
>
> DataStream<Input> lateRecords = mainResult.getSideOutput(lateTag);
>
> Best, Fabian
>
> Am Mo., 28. Jan. 2019 um 11:09 Uhr schrieb Ramya Ramamurthy <
> [hidden email]>:
>
> > Hi,
> >
> > We were trying to collect the sideOutput.
> > But failed to understand as to how to convert this windowed stream to a
> > datastream.
> >
> > final OutputTag<Tuple6<String, String, String, String, String,
> Timestamp>>
> > lateOutputTag = new OutputTag<Tuple6<String, String, String, String,
> > String, Timestamp>>("late-data"){};
> > withTime.keyBy(0, 2)
> > .window(TumblingEventTimeWindows.of(Time.minutes(5)))
> > .allowedLateness(Time.minutes(1))
> > .sideOutputLateData(lateOutputTag);
> >
> > I would now have a windowed stream with records coming in late, tagged as
> > lateOutputTag. How to convert the packets which are not late , back to a
> > datastream. Do we need to use the .apply function to collect this data
> ...
> > quite unsure of this. Appreciate your help.
> >
> > Best Regards,
> >
> >
> >
> > On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <[hidden email]>
> wrote:
> >
> > > Hi Ramya,
> > >
> > > This would be a great feature, but unfortunately is not support (yet)
> by
> > > Flink SQL.
> > > Currently, all late records are dropped.
> > >
> > > A workaround is to ingest the stream as a DataStream, have a custom
> > > operator that routes all late records to a side output, and registering
> > the
> > > DataStream without late records as a table on which the SQL query is
> > > evaluated.
> > > This requires quite a bit of boilerplate code but could be hidden in a
> > util
> > > class.
> > >
> > > Best, Fabian
> > >
> > > Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy <
> > > [hidden email]>:
> > >
> > > > Hi,
> > > >
> > > > I have a query with regard to Late arriving records.
> > > > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11.
> > > > In my sink operators, which converts this table to a stream which is
> > > being
> > > > pushed to Elastic Search, I am able to see this metric "
> > > > *numLateRecordsDropped*".
> > > >
> > > > My Kafka consumers doesn't seem to have any lag and the events are
> > > > processed properly. To be able to take these events to a side outputs
> > > > doesn't seem to be possible with tables. Below is the snippet:
> > > >
> > > >         tableEnv.connect(new Kafka()
> > > >           /* setting of all kafka properties */
> > > >                .startFromLatest())
> > > >                .withSchema(new Schema()
> > > >                        .field("sid", Types.STRING())
> > > >                        .field("_zpsbd6", Types.STRING())
> > > >                        .field("r1", Types.STRING())
> > > >                        .field("r2", Types.STRING())
> > > >                        .field("r5", Types.STRING())
> > > >                        .field("r10", Types.STRING())
> > > >                        .field("isBot", Types.BOOLEAN())
> > > >                        .field("botcode", Types.STRING())
> > > >                        .field("ts", Types.SQL_TIMESTAMP())
> > > >                        .rowtime(new Rowtime()
> > > >                                .timestampsFromField("recvdTime")
> > > >                                .watermarksPeriodicBounded(10000)
> > > >                        )
> > > >                )
> > > >                .withFormat(new Json().deriveSchema())
> > > >                .inAppendMode()
> > > >                .registerTableSource("sourceTopic");
> > > >
> > > >        String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as
> > total_hits, "
> > > >                + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as
> > tumbleStart, "
> > > >                + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd
> FROM
> > > > sourceTopic "
> > > >                + "WHERE r1='true' or r2='true' or r5='true' or
> > r10='true'
> > > > and isBot='true' "
> > > >                + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid,
> > > _zpsbd6";
> > > >
> > > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric
> is
> > > > showing the lateRecordsDropped, while executing the group by
> operation.
> > > >
> > > > Is there  a way to get the sideOutput of this to be able to debug
> > better
> > > ??
> > > >
> > > > Thanks,
> > > > ~Ramya.
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Side Outputs for late arriving records

Fabian Hueske-2
Hi Ramya,

If you don't want to apply any logic but just filter late records, you
should not use a window because it needs to shuffle and group records into
windows.
Instead, you can use a non-keyed ProcessFunction and compare the timestamp
of the record (context.timestamp()) with the current watermark
(context.getTimerService().currentWatermark()) and emit all records that
are late to a side output.
This will avoid the shuffle and reduce processing latency.

Best, Fabian

Am Di., 29. Jan. 2019 um 15:02 Uhr schrieb Ramya Ramamurthy <
[hidden email]>:

> Hi Fabian,
>
> I do not have anything to do "yourfunction".
> .apply windowFunction is legacy is what the documentation says. But I am at
> a loss to understand which of the reduce, aggregate, Fold, Apply must i
> use, as i hardly have any operations to perform but to return the stream
> with no late data back to me [So that I will construct a Flink table with
> this data and do my processing there].
>
> ~Ramya.
>
>
>
> On Tue, Jan 29, 2019 at 3:52 PM Fabian Hueske <[hidden email]> wrote:
>
> > Hi Ramya,
> >
> > This works by calling getSideOutput() on the main output of the window
> > function.
> > The main output is collected by applying a function on the window.
> >
> > DataStream<Input> input = ...
> > OutputTag<Input> lateTag = ...
> >
> > DataStream<Result> mainResult = input
> >   .keyBy(...)
> >   .window(...)
> >   .sideOutputLateData(lateTag)
> >   .apply(yourFunction);
> >
> > DataStream<Input> lateRecords = mainResult.getSideOutput(lateTag);
> >
> > Best, Fabian
> >
> > Am Mo., 28. Jan. 2019 um 11:09 Uhr schrieb Ramya Ramamurthy <
> > [hidden email]>:
> >
> > > Hi,
> > >
> > > We were trying to collect the sideOutput.
> > > But failed to understand as to how to convert this windowed stream to a
> > > datastream.
> > >
> > > final OutputTag<Tuple6<String, String, String, String, String,
> > Timestamp>>
> > > lateOutputTag = new OutputTag<Tuple6<String, String, String, String,
> > > String, Timestamp>>("late-data"){};
> > > withTime.keyBy(0, 2)
> > > .window(TumblingEventTimeWindows.of(Time.minutes(5)))
> > > .allowedLateness(Time.minutes(1))
> > > .sideOutputLateData(lateOutputTag);
> > >
> > > I would now have a windowed stream with records coming in late, tagged
> as
> > > lateOutputTag. How to convert the packets which are not late , back to
> a
> > > datastream. Do we need to use the .apply function to collect this data
> > ...
> > > quite unsure of this. Appreciate your help.
> > >
> > > Best Regards,
> > >
> > >
> > >
> > > On Thu, Jan 24, 2019 at 11:03 PM Fabian Hueske <[hidden email]>
> > wrote:
> > >
> > > > Hi Ramya,
> > > >
> > > > This would be a great feature, but unfortunately is not support (yet)
> > by
> > > > Flink SQL.
> > > > Currently, all late records are dropped.
> > > >
> > > > A workaround is to ingest the stream as a DataStream, have a custom
> > > > operator that routes all late records to a side output, and
> registering
> > > the
> > > > DataStream without late records as a table on which the SQL query is
> > > > evaluated.
> > > > This requires quite a bit of boilerplate code but could be hidden in
> a
> > > util
> > > > class.
> > > >
> > > > Best, Fabian
> > > >
> > > > Am Do., 24. Jan. 2019 um 06:42 Uhr schrieb Ramya Ramamurthy <
> > > > [hidden email]>:
> > > >
> > > > > Hi,
> > > > >
> > > > > I have a query with regard to Late arriving records.
> > > > > We are using Flink 1.7 with Kafka Consumers of version 2.11.0.11.
> > > > > In my sink operators, which converts this table to a stream which
> is
> > > > being
> > > > > pushed to Elastic Search, I am able to see this metric "
> > > > > *numLateRecordsDropped*".
> > > > >
> > > > > My Kafka consumers doesn't seem to have any lag and the events are
> > > > > processed properly. To be able to take these events to a side
> outputs
> > > > > doesn't seem to be possible with tables. Below is the snippet:
> > > > >
> > > > >         tableEnv.connect(new Kafka()
> > > > >           /* setting of all kafka properties */
> > > > >                .startFromLatest())
> > > > >                .withSchema(new Schema()
> > > > >                        .field("sid", Types.STRING())
> > > > >                        .field("_zpsbd6", Types.STRING())
> > > > >                        .field("r1", Types.STRING())
> > > > >                        .field("r2", Types.STRING())
> > > > >                        .field("r5", Types.STRING())
> > > > >                        .field("r10", Types.STRING())
> > > > >                        .field("isBot", Types.BOOLEAN())
> > > > >                        .field("botcode", Types.STRING())
> > > > >                        .field("ts", Types.SQL_TIMESTAMP())
> > > > >                        .rowtime(new Rowtime()
> > > > >                                .timestampsFromField("recvdTime")
> > > > >                                .watermarksPeriodicBounded(10000)
> > > > >                        )
> > > > >                )
> > > > >                .withFormat(new Json().deriveSchema())
> > > > >                .inAppendMode()
> > > > >                .registerTableSource("sourceTopic");
> > > > >
> > > > >        String sql = "SELECT sid, _zpsbd6 as ip, COUNT(*) as
> > > total_hits, "
> > > > >                + "TUMBLE_START(ts, INTERVAL '5' MINUTE) as
> > > tumbleStart, "
> > > > >                + "TUMBLE_END(ts, INTERVAL '5' MINUTE) as tumbleEnd
> > FROM
> > > > > sourceTopic "
> > > > >                + "WHERE r1='true' or r2='true' or r5='true' or
> > > r10='true'
> > > > > and isBot='true' "
> > > > >                + "GROUP BY TUMBLE(ts, INTERVAL '5' MINUTE), sid,
> > > > _zpsbd6";
> > > > >
> > > > > Table source = tableEnv.sqlQuery(sql) ---> This is where the metric
> > is
> > > > > showing the lateRecordsDropped, while executing the group by
> > operation.
> > > > >
> > > > > Is there  a way to get the sideOutput of this to be able to debug
> > > better
> > > > ??
> > > > >
> > > > > Thanks,
> > > > > ~Ramya.
> > > > >
> > > >
> > >
> >
>