[DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

classic Classic list List threaded Threaded
22 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

Kostas Kloudas-5
Hi all,

As described in FLIP-131 [1], we are aiming at deprecating the DataSet
API in favour of the DataStream API and the Table API. After this work
is done, the user will be able to write a program using the DataStream
API and this will execute efficiently on both bounded and unbounded
data. But before we reach this point, it is worth discussing and
agreeing on the semantics of some operations as we transition from the
streaming world to the batch one.

This thread and the associated FLIP [2] aim at discussing these issues
as these topics are pretty important to users and can lead to
unpleasant surprises if we do not pay attention.

Let's have a healthy discussion here and I will be updating the FLIP
accordingly.

Cheers,
Kostas

[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
[2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

Kurt Young
Hi Kostas,

Thanks for starting this discussion. The first part of this FLIP: "Batch vs
Streaming Scheduling" looks reasonable to me.
However, there is another dimension I think we should also take into
consideration, which is whether checkpointing is enabled.

This option is orthogonal (but not fully) to the boundedness and
persistence of the input. For example, consider an arbitrary operator
who uses state, we can enable checkpoint to achieve better failure recovery
if the input is bounded and pipelined. And if the input
is bounded and persistent, we can still use checkpointing, but we might
need to checkpoint the offset of the intermediate result set of
the operator. This would require much more work and we can defer this to
the future.

Beyond this dimension, there is another question to be asked. If the
topology is mixed with some bounded and unbounded inputs, what
would be the behavior? E.g. a join operator with one of its input bounded,
and another input unbounded. Can we still use BATCH or
STREAMING to define the schedule policy? What kind of failure recovery
guarantee Flink can provide to the users.

I don't have a clear answer for now, but just want to raise them up to seek
some discussion.

Best,
Kurt


On Wed, Aug 12, 2020 at 11:22 PM Kostas Kloudas <[hidden email]> wrote:

> Hi all,
>
> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
> API in favour of the DataStream API and the Table API. After this work
> is done, the user will be able to write a program using the DataStream
> API and this will execute efficiently on both bounded and unbounded
> data. But before we reach this point, it is worth discussing and
> agreeing on the semantics of some operations as we transition from the
> streaming world to the batch one.
>
> This thread and the associated FLIP [2] aim at discussing these issues
> as these topics are pretty important to users and can lead to
> unpleasant surprises if we do not pay attention.
>
> Let's have a healthy discussion here and I will be updating the FLIP
> accordingly.
>
> Cheers,
> Kostas
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

David Anderson-3
In reply to this post by Kostas Kloudas-5
Kostas,

I'm pleased to see some concrete details in this FLIP.

I wonder if the current proposal goes far enough in the direction of
recognizing the need some users may have for "batch" and "bounded
streaming" to be treated differently. If I've understood it correctly, the
section on scheduling allows me to choose STREAMING scheduling even if I
have bounded sources. I like that approach, because it recognizes that even
though I have bounded inputs, I don't necessarily want batch processing
semantics. I think it makes sense to extend this idea to processing time
support as well.

My thinking is that sometimes in development and testing it's reasonable to
run exactly the same job as in production, except with different sources
and sinks. While it might be a reasonable default, I'm not convinced that
switching a processing time streaming job to read from a bounded source
should always cause it to fail.

David

On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas <[hidden email]> wrote:

> Hi all,
>
> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
> API in favour of the DataStream API and the Table API. After this work
> is done, the user will be able to write a program using the DataStream
> API and this will execute efficiently on both bounded and unbounded
> data. But before we reach this point, it is worth discussing and
> agreeing on the semantics of some operations as we transition from the
> streaming world to the batch one.
>
> This thread and the associated FLIP [2] aim at discussing these issues
> as these topics are pretty important to users and can lead to
> unpleasant surprises if we do not pay attention.
>
> Let's have a healthy discussion here and I will be updating the FLIP
> accordingly.
>
> Cheers,
> Kostas
>
> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

Kostas Kloudas-5
Hi Kurt and David,

Thanks a lot for the insightful feedback!

@Kurt: For the topic of checkpointing with Batch Scheduling, I totally
agree with you that it requires a lot more work and careful thinking
on the semantics. This FLIP was written under the assumption that if
the user wants to have checkpoints on bounded input, he/she will have
to go with STREAMING as the scheduling mode. Checkpointing for BATCH
can be handled as a separate topic in the future.

In the case of MIXED workloads and for this FLIP, the scheduling mode
should be set to STREAMING. That is why the AUTOMATIC option sets
scheduling to BATCH only if all the sources are bounded. I am not sure
what are the plans there at the scheduling level, as one could imagine
in the future that in mixed workloads, we schedule first all the
bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
subgraph per application, which is going to be scheduled after all
Bounded ones have finished. Essentially the bounded subgraphs will be
used to bootstrap the unbounded one. But, I am not aware of any plans
towards that direction.


@David: The processing time timer handling is a topic that has also
been discussed in the community in the past, and I do not remember any
final conclusion unfortunately.

In the current context and for bounded input, we chose to favor
reproducibility of the result, as this is expected in batch processing
where the whole input is available in advance. This is why this
proposal suggests to not allow processing time timers. But I
understand your argument that the user may want to be able to run the
same pipeline on batch and streaming this is why we added the two
options under future work, namely (from the FLIP):

```
Future Work: In the future we may consider adding as options the capability of:
* firing all the registered processing time timers at the end of a job
(at close()) or,
* ignoring all the registered processing time timers at the end of a job.
```

Conceptually, we are essentially saying that we assume that batch
execution is assumed to be instantaneous and refers to a single
"point" in time and any processing-time timers for the future may fire
at the end of execution or be ignored (but not throw an exception). I
could also see ignoring the timers in batch as the default, if this
makes more sense.

By the way, do you have any usecases in mind that will help us better
shape our processing time timer handling?

Kostas

On Mon, Aug 17, 2020 at 2:52 PM David Anderson <[hidden email]> wrote:

>
> Kostas,
>
> I'm pleased to see some concrete details in this FLIP.
>
> I wonder if the current proposal goes far enough in the direction of recognizing the need some users may have for "batch" and "bounded streaming" to be treated differently. If I've understood it correctly, the section on scheduling allows me to choose STREAMING scheduling even if I have bounded sources. I like that approach, because it recognizes that even though I have bounded inputs, I don't necessarily want batch processing semantics. I think it makes sense to extend this idea to processing time support as well.
>
> My thinking is that sometimes in development and testing it's reasonable to run exactly the same job as in production, except with different sources and sinks. While it might be a reasonable default, I'm not convinced that switching a processing time streaming job to read from a bounded source should always cause it to fail.
>
> David
>
> On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas <[hidden email]> wrote:
>>
>> Hi all,
>>
>> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
>> API in favour of the DataStream API and the Table API. After this work
>> is done, the user will be able to write a program using the DataStream
>> API and this will execute efficiently on both bounded and unbounded
>> data. But before we reach this point, it is worth discussing and
>> agreeing on the semantics of some operations as we transition from the
>> streaming world to the batch one.
>>
>> This thread and the associated FLIP [2] aim at discussing these issues
>> as these topics are pretty important to users and can lead to
>> unpleasant surprises if we do not pay attention.
>>
>> Let's have a healthy discussion here and I will be updating the FLIP
>> accordingly.
>>
>> Cheers,
>> Kostas
>>
>> [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>> [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
Reply | Threaded
Open this post in threaded view
|

Re: Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

Yun Gao
Hi,

    Very thanks for bringing up this discussion!

    One more question is that does the BATCH and STREAMING mode also decides the shuffle types and operators? I'm asking so because that even for blocking mode, it should also benefit from keeping some edges to be pipeline if the resources are known to be enough. Do we also consider to expose more fine-grained control on the shuffle types?

Best,
 Yun



 ------------------Original Mail ------------------
Sender:Kostas Kloudas <[hidden email]>
Send Date:Tue Aug 18 02:24:21 2020
Recipients:David Anderson <[hidden email]>
CC:dev <[hidden email]>, user <[hidden email]>
Subject:Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input
Hi Kurt and David,

Thanks a lot for the insightful feedback!

@Kurt: For the topic of checkpointing with Batch Scheduling, I totally
agree with you that it requires a lot more work and careful thinking
on the semantics. This FLIP was written under the assumption that if
the user wants to have checkpoints on bounded input, he/she will have
to go with STREAMING as the scheduling mode. Checkpointing for BATCH
can be handled as a separate topic in the future.

In the case of MIXED workloads and for this FLIP, the scheduling mode
should be set to STREAMING. That is why the AUTOMATIC option sets
scheduling to BATCH only if all the sources are bounded. I am not sure
what are the plans there at the scheduling level, as one could imagine
in the future that in mixed workloads, we schedule first all the
bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
subgraph per application, which is going to be scheduled after all
Bounded ones have finished. Essentially the bounded subgraphs will be
used to bootstrap the unbounded one. But, I am not aware of any plans
towards that direction.


@David: The processing time timer handling is a topic that has also
been discussed in the community in the past, and I do not remember any
final conclusion unfortunately.

In the current context and for bounded input, we chose to favor
reproducibility of the result, as this is expected in batch processing
where the whole input is available in advance. This is why this
proposal suggests to not allow processing time timers. But I
understand your argument that the user may want to be able to run the
same pipeline on batch and streaming this is why we added the two
options under future work, namely (from the FLIP):

```
Future Work: In the future we may consider adding as options the capability of:
* firing all the registered processing time timers at the end of a job
(at close()) or,
* ignoring all the registered processing time timers at the end of a job.
```

Conceptually, we are essentially saying that we assume that batch
execution is assumed to be instantaneous and refers to a single
"point" in time and any processing-time timers for the future may fire
at the end of execution or be ignored (but not throw an exception). I
could also see ignoring the timers in batch as the default, if this
makes more sense.

By the way, do you have any usecases in mind that will help us better
shape our processing time timer handling?

Kostas

On Mon, Aug 17, 2020 at 2:52 PM David Anderson <[hidden email]> wrote:

>
> Kostas,
>
> I'm pleased to see some concrete details in this FLIP.
>
> I wonder if the current proposal goes far enough in the direction of recognizing the need some users may have for "batch" and "bounded streaming" to be treated differently. If I've understood it correctly, the section on scheduling allows me to choose STREAMING scheduling even if I have bounded sources. I like that approach, because it recognizes that even though I have bounded inputs, I don't necessarily want batch processing semantics. I think it makes sense to extend this idea to processing time support as well.
>
> My thinking is that sometimes in development and testing it's reasonable to run exactly the same job as in production, except with different sources and sinks. While it might be a reasonable default, I'm not convinced that switching a processing time streaming job to read from a bounded source should always cause it to fail.
>
> David
>
> On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas <[hidden email]> wrote:
>>
>> Hi all,
>>
>> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
>> API in favour of the DataStream API and the Table API. After this work
>> is done, the user will be able to write a program using the DataStream
>> API and this will execute efficiently on both bounded and unbounded
>> data. But before we reach this point, it is worth discussing and
>> agreeing on the semantics of some operations as we transition from the
>> streaming world to the batch one.
>>
>> This thread and the associated FLIP [2] aim at discussing these issues
>> as these topics are pretty important to users and can lead to
>> unpleasant surprises if we do not pay attention.
>>
>> Let's have a healthy discussion here and I will be updating the FLIP
>> accordingly.
>>
>> Cheers,
>> Kostas
>>
>> [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>> [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

dwysakowicz

Hi all,

@Klou Nice write up. One comment I have is I would suggest using a different configuration parameter name. The way I understand the proposal the BATCH/STREAMING/AUTOMATIC affects not only the scheduling mode but types of shuffles as well. How about `execution.mode` ? Or `execution-runtime-mode`?

@Yun The way I understand it

BATCH = pipelined scheduling with region failover + blocking keyBy shuffles (all pointwise shuffles pipelined)

STREAM = eager scheduling with checkpointing + pipelined keyBy shuffles

AUTOMATIC = choose based on source

power users could still override any shuffle modes in PartitionTransformation, if we find more people interested in controlling the type of shuffles, we can think of exposing that in the DataStream as well in the future.

Best,

Dawid

On 18/08/2020 06:18, Yun Gao wrote:
Hi,ย 

ย  ย  Very thanks for bringing up this discussion!

ย  ย  One more question is that does the BATCH and STREAMING mode also decides the shuffle types and operators? I'm asking so because that even for blocking mode, it should also benefit from keeping some edges to be pipeline if the resources are known to be enough. Do we also consider to expose more fine-grained control on the shuffle types?ย 

Best,
ย Yunย 


------------------Original Mail ------------------
Sender:Kostas Kloudas [hidden email]
Send Date:Tue Aug 18 02:24:21 2020
Recipients:David Anderson [hidden email]
Subject:Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input
Hiย Kurtย andย David,

Thanksย aย lotย forย theย insightfulย feedback!

@Kurt:ย Forย theย topicย ofย checkpointingย withย Batchย Scheduling,ย Iย totally
agreeย withย youย thatย itย requiresย aย lotย moreย workย andย carefulย thinking
onย theย semantics.ย Thisย FLIPย wasย writtenย underย theย assumptionย thatย if
theย userย wantsย toย haveย checkpointsย onย boundedย input,ย he/sheย willย have
toย goย withย STREAMINGย asย theย schedulingย mode.ย Checkpointingย forย BATCH
canย beย handledย asย aย separateย topicย inย theย future.

Inย theย caseย ofย MIXEDย workloadsย andย forย thisย FLIP,ย theย schedulingย mode
shouldย beย setย toย STREAMING.ย Thatย isย whyย theย AUTOMATICย optionย sets
schedulingย toย BATCHย onlyย ifย allย theย sourcesย areย bounded.ย Iย amย notย sure
whatย areย theย plansย thereย atย theย schedulingย level,ย asย oneย couldย imagine
inย theย futureย thatย inย mixedย workloads,ย weย scheduleย firstย allย the
boundedย subgraphsย inย BATCHย modeย andย weย allowย onlyย oneย UNBOUNDED
subgraphย perย application,ย whichย isย goingย toย beย scheduledย afterย all
Boundedย onesย haveย finished.ย Essentiallyย theย boundedย subgraphsย willย be
usedย toย bootstrapย theย unboundedย one.ย But,ย Iย amย notย awareย ofย anyย plans
towardsย thatย direction.


@David:ย Theย processingย timeย timerย handlingย isย aย topicย thatย hasย also
beenย discussedย inย theย communityย inย theย past,ย andย Iย doย notย rememberย any
finalย conclusionย unfortunately.

Inย theย currentย contextย andย forย boundedย input,ย weย choseย toย favor
reproducibilityย ofย theย result,ย asย thisย isย expectedย inย batchย processing
whereย theย wholeย inputย isย availableย inย advance.ย Thisย isย whyย this
proposalย suggestsย toย notย allowย processingย timeย timers.ย Butย I
understandย yourย argumentย thatย theย userย mayย wantย toย beย ableย toย runย the
sameย pipelineย onย batchย andย streamingย thisย isย whyย weย addedย theย two
optionsย underย futureย work,ย namelyย (fromย theย FLIP):

```
Futureย Work:ย Inย theย futureย weย mayย considerย addingย asย optionsย theย capabilityย of:
*ย firingย allย theย registeredย processingย timeย timersย atย theย endย ofย aย job
(atย close())ย or,
*ย ignoringย allย theย registeredย processingย timeย timersย atย theย endย ofย aย job.
```

Conceptually,ย weย areย essentiallyย sayingย thatย weย assumeย thatย batch
executionย isย assumedย toย beย instantaneousย andย refersย toย aย single
"point"ย inย timeย andย anyย processing-timeย timersย forย theย futureย mayย fire
atย theย endย ofย executionย orย beย ignoredย (butย notย throwย anย exception).ย I
couldย alsoย seeย ignoringย theย timersย inย batchย asย theย default,ย ifย this
makesย moreย sense.

Byย theย way,ย doย youย haveย anyย usecasesย inย mindย thatย willย helpย usย better
shapeย ourย processingย timeย timerย handling?

Kostas

Onย Mon,ย Augย 17,ย 2020ย atย 2:52ย PMย Davidย Andersonย [hidden email]ย wrote:
>
>ย Kostas,
>
>ย I'mย pleasedย toย seeย someย concreteย detailsย inย thisย FLIP.
>
>ย Iย wonderย ifย theย currentย proposalย goesย farย enoughย inย theย directionย ofย recognizingย theย needย someย usersย mayย haveย forย "batch"ย andย "boundedย streaming"ย toย beย treatedย differently.ย Ifย I'veย understoodย itย correctly,ย theย sectionย onย schedulingย allowsย meย toย chooseย STREAMINGย schedulingย evenย ifย Iย haveย boundedย sources.ย Iย likeย thatย approach,ย becauseย itย recognizesย thatย evenย thoughย Iย haveย boundedย inputs,ย Iย don'tย necessarilyย wantย batchย processingย semantics.ย Iย thinkย itย makesย senseย toย extendย thisย ideaย toย processingย timeย supportย asย well.
>
>ย Myย thinkingย isย thatย sometimesย inย developmentย andย testingย it'sย reasonableย toย runย exactlyย theย sameย jobย asย inย production,ย exceptย withย differentย sourcesย andย sinks.ย Whileย itย mightย beย aย reasonableย default,ย I'mย notย convincedย thatย switchingย aย processingย timeย streamingย jobย toย readย fromย aย boundedย sourceย shouldย alwaysย causeย itย toย fail.
>
>ย David
>
>ย Onย Wed,ย Augย 12,ย 2020ย atย 5:22ย PMย Kostasย Kloudasย [hidden email]ย wrote:
>>
>>ย Hiย all,
>>
>>ย Asย describedย inย FLIP-131ย [1],ย weย areย aimingย atย deprecatingย theย DataSet
>>ย APIย inย favourย ofย theย DataStreamย APIย andย theย Tableย API.ย Afterย thisย work
>>ย isย done,ย theย userย willย beย ableย toย writeย aย programย usingย theย DataStream
>>ย APIย andย thisย willย executeย efficientlyย onย bothย boundedย andย unbounded
>>ย data.ย Butย beforeย weย reachย thisย point,ย itย isย worthย discussingย and
>>ย agreeingย onย theย semanticsย ofย someย operationsย asย weย transitionย fromย the
>>ย streamingย worldย toย theย batchย one.
>>
>>ย Thisย threadย andย theย associatedย FLIPย [2]ย aimย atย discussingย theseย issues
>>ย asย theseย topicsย areย prettyย importantย toย usersย andย canย leadย to
>>ย unpleasantย surprisesย ifย weย doย notย payย attention.
>>
>>ย Let'sย haveย aย healthyย discussionย hereย andย Iย willย beย updatingย theย FLIP
>>ย accordingly.
>>
>>ย Cheers,
>>ย Kostas
>>
>>ย [1]ย https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>>ย [2]ย https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

Kostas Kloudas-5
Hi Yun and Dawid,

Dawid is correct in that:
```
BATCH = pipelined scheduling with region failover + blocking keyBy
shuffles (all pointwise shuffles pipelined)
STREAM = eager scheduling with checkpointing + pipelined keyBy shuffles
AUTOMATIC = choose based on sources (ALL bounded == BATCH, STREAMING otherwise)
```

For allowing users to set the shuffling mode, we can consider it but I
think we should be careful because if checkpointing is enabled (e.g.
in STREAMING), then introducing a blocking shuffle will cause
problems. The opposite, allowing pipelined execution for keyBy's in
BATCH, may be ok.

Kostas


On Tue, Aug 18, 2020 at 11:40 AM Dawid Wysakowicz
<[hidden email]> wrote:

>
> Hi all,
>
> @Klou Nice write up. One comment I have is I would suggest using a different configuration parameter name. The way I understand the proposal the BATCH/STREAMING/AUTOMATIC affects not only the scheduling mode but types of shuffles as well. How about `execution.mode` ? Or `execution-runtime-mode`?
>
> @Yun The way I understand it
>
> BATCH = pipelined scheduling with region failover + blocking keyBy shuffles (all pointwise shuffles pipelined)
>
> STREAM = eager scheduling with checkpointing + pipelined keyBy shuffles
>
> AUTOMATIC = choose based on source
>
> power users could still override any shuffle modes in PartitionTransformation, if we find more people interested in controlling the type of shuffles, we can think of exposing that in the DataStream as well in the future.
>
> Best,
>
> Dawid
>
> On 18/08/2020 06:18, Yun Gao wrote:
>
> Hi,
>
>     Very thanks for bringing up this discussion!
>
>     One more question is that does the BATCH and STREAMING mode also decides the shuffle types and operators? I'm asking so because that even for blocking mode, it should also benefit from keeping some edges to be pipeline if the resources are known to be enough. Do we also consider to expose more fine-grained control on the shuffle types?
>
> Best,
>  Yun
>
>
> ------------------Original Mail ------------------
> Sender:Kostas Kloudas <[hidden email]>
> Send Date:Tue Aug 18 02:24:21 2020
> Recipients:David Anderson <[hidden email]>
> CC:dev <[hidden email]>, user <[hidden email]>
> Subject:Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input
>>
>> Hi Kurt and David,
>>
>> Thanks a lot for the insightful feedback!
>>
>> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
>> agree with you that it requires a lot more work and careful thinking
>> on the semantics. This FLIP was written under the assumption that if
>> the user wants to have checkpoints on bounded input, he/she will have
>> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
>> can be handled as a separate topic in the future.
>>
>> In the case of MIXED workloads and for this FLIP, the scheduling mode
>> should be set to STREAMING. That is why the AUTOMATIC option sets
>> scheduling to BATCH only if all the sources are bounded. I am not sure
>> what are the plans there at the scheduling level, as one could imagine
>> in the future that in mixed workloads, we schedule first all the
>> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
>> subgraph per application, which is going to be scheduled after all
>> Bounded ones have finished. Essentially the bounded subgraphs will be
>> used to bootstrap the unbounded one. But, I am not aware of any plans
>> towards that direction.
>>
>>
>> @David: The processing time timer handling is a topic that has also
>> been discussed in the community in the past, and I do not remember any
>> final conclusion unfortunately.
>>
>> In the current context and for bounded input, we chose to favor
>> reproducibility of the result, as this is expected in batch processing
>> where the whole input is available in advance. This is why this
>> proposal suggests to not allow processing time timers. But I
>> understand your argument that the user may want to be able to run the
>> same pipeline on batch and streaming this is why we added the two
>> options under future work, namely (from the FLIP):
>>
>> ```
>> Future Work: In the future we may consider adding as options the capability of:
>> * firing all the registered processing time timers at the end of a job
>> (at close()) or,
>> * ignoring all the registered processing time timers at the end of a job.
>> ```
>>
>> Conceptually, we are essentially saying that we assume that batch
>> execution is assumed to be instantaneous and refers to a single
>> "point" in time and any processing-time timers for the future may fire
>> at the end of execution or be ignored (but not throw an exception). I
>> could also see ignoring the timers in batch as the default, if this
>> makes more sense.
>>
>> By the way, do you have any usecases in mind that will help us better
>> shape our processing time timer handling?
>>
>> Kostas
>>
>> On Mon, Aug 17, 2020 at 2:52 PM David Anderson <[hidden email]> wrote:
>> >
>> > Kostas,
>> >
>> > I'm pleased to see some concrete details in this FLIP.
>> >
>> > I wonder if the current proposal goes far enough in the direction of recognizing the need some users may have for "batch" and "bounded streaming" to be treated differently. If I've understood it correctly, the section on scheduling allows me to choose STREAMING scheduling even if I have bounded sources. I like that approach, because it recognizes that even though I have bounded inputs, I don't necessarily want batch processing semantics. I think it makes sense to extend this idea to processing time support as well.
>> >
>> > My thinking is that sometimes in development and testing it's reasonable to run exactly the same job as in production, except with different sources and sinks. While it might be a reasonable default, I'm not convinced that switching a processing time streaming job to read from a bounded source should always cause it to fail.
>> >
>> > David
>> >
>> > On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas <[hidden email]> wrote:
>> >>
>> >> Hi all,
>> >>
>> >> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
>> >> API in favour of the DataStream API and the Table API. After this work
>> >> is done, the user will be able to write a program using the DataStream
>> >> API and this will execute efficiently on both bounded and unbounded
>> >> data. But before we reach this point, it is worth discussing and
>> >> agreeing on the semantics of some operations as we transition from the
>> >> streaming world to the batch one.
>> >>
>> >> This thread and the associated FLIP [2] aim at discussing these issues
>> >> as these topics are pretty important to users and can lead to
>> >> unpleasant surprises if we do not pay attention.
>> >>
>> >> Let's have a healthy discussion here and I will be updating the FLIP
>> >> accordingly.
>> >>
>> >> Cheers,
>> >> Kostas
>> >>
>> >> [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>> >> [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

David Anderson-3
In reply to this post by Kostas Kloudas-5
Being able to optionally fire registered processing time timers at the end
of a job would be interesting, and would help in (at least some of) the
cases I have in mind. I don't have a better idea.

David

On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas <[hidden email]> wrote:

> Hi Kurt and David,
>
> Thanks a lot for the insightful feedback!
>
> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
> agree with you that it requires a lot more work and careful thinking
> on the semantics. This FLIP was written under the assumption that if
> the user wants to have checkpoints on bounded input, he/she will have
> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
> can be handled as a separate topic in the future.
>
> In the case of MIXED workloads and for this FLIP, the scheduling mode
> should be set to STREAMING. That is why the AUTOMATIC option sets
> scheduling to BATCH only if all the sources are bounded. I am not sure
> what are the plans there at the scheduling level, as one could imagine
> in the future that in mixed workloads, we schedule first all the
> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
> subgraph per application, which is going to be scheduled after all
> Bounded ones have finished. Essentially the bounded subgraphs will be
> used to bootstrap the unbounded one. But, I am not aware of any plans
> towards that direction.
>
>
> @David: The processing time timer handling is a topic that has also
> been discussed in the community in the past, and I do not remember any
> final conclusion unfortunately.
>
> In the current context and for bounded input, we chose to favor
> reproducibility of the result, as this is expected in batch processing
> where the whole input is available in advance. This is why this
> proposal suggests to not allow processing time timers. But I
> understand your argument that the user may want to be able to run the
> same pipeline on batch and streaming this is why we added the two
> options under future work, namely (from the FLIP):
>
> ```
> Future Work: In the future we may consider adding as options the
> capability of:
> * firing all the registered processing time timers at the end of a job
> (at close()) or,
> * ignoring all the registered processing time timers at the end of a job.
> ```
>
> Conceptually, we are essentially saying that we assume that batch
> execution is assumed to be instantaneous and refers to a single
> "point" in time and any processing-time timers for the future may fire
> at the end of execution or be ignored (but not throw an exception). I
> could also see ignoring the timers in batch as the default, if this
> makes more sense.
>
> By the way, do you have any usecases in mind that will help us better
> shape our processing time timer handling?
>
> Kostas
>
> On Mon, Aug 17, 2020 at 2:52 PM David Anderson <[hidden email]>
> wrote:
> >
> > Kostas,
> >
> > I'm pleased to see some concrete details in this FLIP.
> >
> > I wonder if the current proposal goes far enough in the direction of
> recognizing the need some users may have for "batch" and "bounded
> streaming" to be treated differently. If I've understood it correctly, the
> section on scheduling allows me to choose STREAMING scheduling even if I
> have bounded sources. I like that approach, because it recognizes that even
> though I have bounded inputs, I don't necessarily want batch processing
> semantics. I think it makes sense to extend this idea to processing time
> support as well.
> >
> > My thinking is that sometimes in development and testing it's reasonable
> to run exactly the same job as in production, except with different sources
> and sinks. While it might be a reasonable default, I'm not convinced that
> switching a processing time streaming job to read from a bounded source
> should always cause it to fail.
> >
> > David
> >
> > On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas <[hidden email]>
> wrote:
> >>
> >> Hi all,
> >>
> >> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
> >> API in favour of the DataStream API and the Table API. After this work
> >> is done, the user will be able to write a program using the DataStream
> >> API and this will execute efficiently on both bounded and unbounded
> >> data. But before we reach this point, it is worth discussing and
> >> agreeing on the semantics of some operations as we transition from the
> >> streaming world to the batch one.
> >>
> >> This thread and the associated FLIP [2] aim at discussing these issues
> >> as these topics are pretty important to users and can lead to
> >> unpleasant surprises if we do not pay attention.
> >>
> >> Let's have a healthy discussion here and I will be updating the FLIP
> >> accordingly.
> >>
> >> Cheers,
> >> Kostas
> >>
> >> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> >> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

Kostas Kloudas-5
Hi all,

Thanks for the comments!

@Dawid: "execution.mode" can be a nice alternative and from a quick
look it is not used currently by any configuration option. I will
update the FLIP accordingly.

@David: Given that having the option to allow timers to fire at the
end of the job is already in the FLIP, I will leave it as is and I
will update the default policy to be "ignore processing time timers
set by the user". This will allow existing dataStream programs to run
on bounded inputs. This update will affect point 2 in the "Processing
Time Support in Batch" section.

If these changes cover your proposals, then I would like to start a
voting thread tomorrow evening if this is ok with you.

Please let me know until then.

Kostas

On Tue, Aug 18, 2020 at 3:54 PM David Anderson <[hidden email]> wrote:

>
> Being able to optionally fire registered processing time timers at the end of a job would be interesting, and would help in (at least some of) the cases I have in mind. I don't have a better idea.
>
> David
>
> On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas <[hidden email]> wrote:
>>
>> Hi Kurt and David,
>>
>> Thanks a lot for the insightful feedback!
>>
>> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
>> agree with you that it requires a lot more work and careful thinking
>> on the semantics. This FLIP was written under the assumption that if
>> the user wants to have checkpoints on bounded input, he/she will have
>> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
>> can be handled as a separate topic in the future.
>>
>> In the case of MIXED workloads and for this FLIP, the scheduling mode
>> should be set to STREAMING. That is why the AUTOMATIC option sets
>> scheduling to BATCH only if all the sources are bounded. I am not sure
>> what are the plans there at the scheduling level, as one could imagine
>> in the future that in mixed workloads, we schedule first all the
>> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
>> subgraph per application, which is going to be scheduled after all
>> Bounded ones have finished. Essentially the bounded subgraphs will be
>> used to bootstrap the unbounded one. But, I am not aware of any plans
>> towards that direction.
>>
>>
>> @David: The processing time timer handling is a topic that has also
>> been discussed in the community in the past, and I do not remember any
>> final conclusion unfortunately.
>>
>> In the current context and for bounded input, we chose to favor
>> reproducibility of the result, as this is expected in batch processing
>> where the whole input is available in advance. This is why this
>> proposal suggests to not allow processing time timers. But I
>> understand your argument that the user may want to be able to run the
>> same pipeline on batch and streaming this is why we added the two
>> options under future work, namely (from the FLIP):
>>
>> ```
>> Future Work: In the future we may consider adding as options the capability of:
>> * firing all the registered processing time timers at the end of a job
>> (at close()) or,
>> * ignoring all the registered processing time timers at the end of a job.
>> ```
>>
>> Conceptually, we are essentially saying that we assume that batch
>> execution is assumed to be instantaneous and refers to a single
>> "point" in time and any processing-time timers for the future may fire
>> at the end of execution or be ignored (but not throw an exception). I
>> could also see ignoring the timers in batch as the default, if this
>> makes more sense.
>>
>> By the way, do you have any usecases in mind that will help us better
>> shape our processing time timer handling?
>>
>> Kostas
>>
>> On Mon, Aug 17, 2020 at 2:52 PM David Anderson <[hidden email]> wrote:
>> >
>> > Kostas,
>> >
>> > I'm pleased to see some concrete details in this FLIP.
>> >
>> > I wonder if the current proposal goes far enough in the direction of recognizing the need some users may have for "batch" and "bounded streaming" to be treated differently. If I've understood it correctly, the section on scheduling allows me to choose STREAMING scheduling even if I have bounded sources. I like that approach, because it recognizes that even though I have bounded inputs, I don't necessarily want batch processing semantics. I think it makes sense to extend this idea to processing time support as well.
>> >
>> > My thinking is that sometimes in development and testing it's reasonable to run exactly the same job as in production, except with different sources and sinks. While it might be a reasonable default, I'm not convinced that switching a processing time streaming job to read from a bounded source should always cause it to fail.
>> >
>> > David
>> >
>> > On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas <[hidden email]> wrote:
>> >>
>> >> Hi all,
>> >>
>> >> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
>> >> API in favour of the DataStream API and the Table API. After this work
>> >> is done, the user will be able to write a program using the DataStream
>> >> API and this will execute efficiently on both bounded and unbounded
>> >> data. But before we reach this point, it is worth discussing and
>> >> agreeing on the semantics of some operations as we transition from the
>> >> streaming world to the batch one.
>> >>
>> >> This thread and the associated FLIP [2] aim at discussing these issues
>> >> as these topics are pretty important to users and can lead to
>> >> unpleasant surprises if we do not pay attention.
>> >>
>> >> Let's have a healthy discussion here and I will be updating the FLIP
>> >> accordingly.
>> >>
>> >> Cheers,
>> >> Kostas
>> >>
>> >> [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>> >> [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

Guowei Ma
Hi, Klou

Thanks for your proposal. It's a very good idea.
Just a little comment about the "Batch vs Streaming Scheduling".  In the
AUTOMATIC execution mode maybe we could not pick BATCH execution mode even
if all sources are bounded. For example some applications would use the
`CheckpointListener`, which is not available in the BATCH mode in current
implementation.
So maybe we need more checks in the AUTOMATIC execution mode.

Best,
Guowei


On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas <[hidden email]> wrote:

> Hi all,
>
> Thanks for the comments!
>
> @Dawid: "execution.mode" can be a nice alternative and from a quick
> look it is not used currently by any configuration option. I will
> update the FLIP accordingly.
>
> @David: Given that having the option to allow timers to fire at the
> end of the job is already in the FLIP, I will leave it as is and I
> will update the default policy to be "ignore processing time timers
> set by the user". This will allow existing dataStream programs to run
> on bounded inputs. This update will affect point 2 in the "Processing
> Time Support in Batch" section.
>
> If these changes cover your proposals, then I would like to start a
> voting thread tomorrow evening if this is ok with you.
>
> Please let me know until then.
>
> Kostas
>
> On Tue, Aug 18, 2020 at 3:54 PM David Anderson <[hidden email]>
> wrote:
> >
> > Being able to optionally fire registered processing time timers at the
> end of a job would be interesting, and would help in (at least some of) the
> cases I have in mind. I don't have a better idea.
> >
> > David
> >
> > On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas <[hidden email]>
> wrote:
> >>
> >> Hi Kurt and David,
> >>
> >> Thanks a lot for the insightful feedback!
> >>
> >> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
> >> agree with you that it requires a lot more work and careful thinking
> >> on the semantics. This FLIP was written under the assumption that if
> >> the user wants to have checkpoints on bounded input, he/she will have
> >> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
> >> can be handled as a separate topic in the future.
> >>
> >> In the case of MIXED workloads and for this FLIP, the scheduling mode
> >> should be set to STREAMING. That is why the AUTOMATIC option sets
> >> scheduling to BATCH only if all the sources are bounded. I am not sure
> >> what are the plans there at the scheduling level, as one could imagine
> >> in the future that in mixed workloads, we schedule first all the
> >> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
> >> subgraph per application, which is going to be scheduled after all
> >> Bounded ones have finished. Essentially the bounded subgraphs will be
> >> used to bootstrap the unbounded one. But, I am not aware of any plans
> >> towards that direction.
> >>
> >>
> >> @David: The processing time timer handling is a topic that has also
> >> been discussed in the community in the past, and I do not remember any
> >> final conclusion unfortunately.
> >>
> >> In the current context and for bounded input, we chose to favor
> >> reproducibility of the result, as this is expected in batch processing
> >> where the whole input is available in advance. This is why this
> >> proposal suggests to not allow processing time timers. But I
> >> understand your argument that the user may want to be able to run the
> >> same pipeline on batch and streaming this is why we added the two
> >> options under future work, namely (from the FLIP):
> >>
> >> ```
> >> Future Work: In the future we may consider adding as options the
> capability of:
> >> * firing all the registered processing time timers at the end of a job
> >> (at close()) or,
> >> * ignoring all the registered processing time timers at the end of a
> job.
> >> ```
> >>
> >> Conceptually, we are essentially saying that we assume that batch
> >> execution is assumed to be instantaneous and refers to a single
> >> "point" in time and any processing-time timers for the future may fire
> >> at the end of execution or be ignored (but not throw an exception). I
> >> could also see ignoring the timers in batch as the default, if this
> >> makes more sense.
> >>
> >> By the way, do you have any usecases in mind that will help us better
> >> shape our processing time timer handling?
> >>
> >> Kostas
> >>
> >> On Mon, Aug 17, 2020 at 2:52 PM David Anderson <[hidden email]>
> wrote:
> >> >
> >> > Kostas,
> >> >
> >> > I'm pleased to see some concrete details in this FLIP.
> >> >
> >> > I wonder if the current proposal goes far enough in the direction of
> recognizing the need some users may have for "batch" and "bounded
> streaming" to be treated differently. If I've understood it correctly, the
> section on scheduling allows me to choose STREAMING scheduling even if I
> have bounded sources. I like that approach, because it recognizes that even
> though I have bounded inputs, I don't necessarily want batch processing
> semantics. I think it makes sense to extend this idea to processing time
> support as well.
> >> >
> >> > My thinking is that sometimes in development and testing it's
> reasonable to run exactly the same job as in production, except with
> different sources and sinks. While it might be a reasonable default, I'm
> not convinced that switching a processing time streaming job to read from a
> bounded source should always cause it to fail.
> >> >
> >> > David
> >> >
> >> > On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas <[hidden email]>
> wrote:
> >> >>
> >> >> Hi all,
> >> >>
> >> >> As described in FLIP-131 [1], we are aiming at deprecating the
> DataSet
> >> >> API in favour of the DataStream API and the Table API. After this
> work
> >> >> is done, the user will be able to write a program using the
> DataStream
> >> >> API and this will execute efficiently on both bounded and unbounded
> >> >> data. But before we reach this point, it is worth discussing and
> >> >> agreeing on the semantics of some operations as we transition from
> the
> >> >> streaming world to the batch one.
> >> >>
> >> >> This thread and the associated FLIP [2] aim at discussing these
> issues
> >> >> as these topics are pretty important to users and can lead to
> >> >> unpleasant surprises if we do not pay attention.
> >> >>
> >> >> Let's have a healthy discussion here and I will be updating the FLIP
> >> >> accordingly.
> >> >>
> >> >> Cheers,
> >> >> Kostas
> >> >>
> >> >> [1]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> >> >> [2]
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

Kostas Kloudas-5
Hi Guowei,

Thanks for the insightful comment!

I agree that this can be a limitation of the current runtime, but I
think that this FLIP can go on as it discusses mainly the semantics
that the DataStream API will expose when applied on bounded data.
There will definitely be other FLIPs that will actually handle the
runtime-related topics.

But it is good to document them nevertheless so that we start soon
ironing out the remaining rough edges.

Cheers,
Kostas

On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma <[hidden email]> wrote:

>
> Hi, Klou
>
> Thanks for your proposal. It's a very good idea.
> Just a little comment about the "Batch vs Streaming Scheduling".  In the AUTOMATIC execution mode maybe we could not pick BATCH execution mode even if all sources are bounded. For example some applications would use the `CheckpointListener`, which is not available in the BATCH mode in current implementation.
> So maybe we need more checks in the AUTOMATIC execution mode.
>
> Best,
> Guowei
>
>
> On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas <[hidden email]> wrote:
>>
>> Hi all,
>>
>> Thanks for the comments!
>>
>> @Dawid: "execution.mode" can be a nice alternative and from a quick
>> look it is not used currently by any configuration option. I will
>> update the FLIP accordingly.
>>
>> @David: Given that having the option to allow timers to fire at the
>> end of the job is already in the FLIP, I will leave it as is and I
>> will update the default policy to be "ignore processing time timers
>> set by the user". This will allow existing dataStream programs to run
>> on bounded inputs. This update will affect point 2 in the "Processing
>> Time Support in Batch" section.
>>
>> If these changes cover your proposals, then I would like to start a
>> voting thread tomorrow evening if this is ok with you.
>>
>> Please let me know until then.
>>
>> Kostas
>>
>> On Tue, Aug 18, 2020 at 3:54 PM David Anderson <[hidden email]> wrote:
>> >
>> > Being able to optionally fire registered processing time timers at the end of a job would be interesting, and would help in (at least some of) the cases I have in mind. I don't have a better idea.
>> >
>> > David
>> >
>> > On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas <[hidden email]> wrote:
>> >>
>> >> Hi Kurt and David,
>> >>
>> >> Thanks a lot for the insightful feedback!
>> >>
>> >> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
>> >> agree with you that it requires a lot more work and careful thinking
>> >> on the semantics. This FLIP was written under the assumption that if
>> >> the user wants to have checkpoints on bounded input, he/she will have
>> >> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
>> >> can be handled as a separate topic in the future.
>> >>
>> >> In the case of MIXED workloads and for this FLIP, the scheduling mode
>> >> should be set to STREAMING. That is why the AUTOMATIC option sets
>> >> scheduling to BATCH only if all the sources are bounded. I am not sure
>> >> what are the plans there at the scheduling level, as one could imagine
>> >> in the future that in mixed workloads, we schedule first all the
>> >> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
>> >> subgraph per application, which is going to be scheduled after all
>> >> Bounded ones have finished. Essentially the bounded subgraphs will be
>> >> used to bootstrap the unbounded one. But, I am not aware of any plans
>> >> towards that direction.
>> >>
>> >>
>> >> @David: The processing time timer handling is a topic that has also
>> >> been discussed in the community in the past, and I do not remember any
>> >> final conclusion unfortunately.
>> >>
>> >> In the current context and for bounded input, we chose to favor
>> >> reproducibility of the result, as this is expected in batch processing
>> >> where the whole input is available in advance. This is why this
>> >> proposal suggests to not allow processing time timers. But I
>> >> understand your argument that the user may want to be able to run the
>> >> same pipeline on batch and streaming this is why we added the two
>> >> options under future work, namely (from the FLIP):
>> >>
>> >> ```
>> >> Future Work: In the future we may consider adding as options the capability of:
>> >> * firing all the registered processing time timers at the end of a job
>> >> (at close()) or,
>> >> * ignoring all the registered processing time timers at the end of a job.
>> >> ```
>> >>
>> >> Conceptually, we are essentially saying that we assume that batch
>> >> execution is assumed to be instantaneous and refers to a single
>> >> "point" in time and any processing-time timers for the future may fire
>> >> at the end of execution or be ignored (but not throw an exception). I
>> >> could also see ignoring the timers in batch as the default, if this
>> >> makes more sense.
>> >>
>> >> By the way, do you have any usecases in mind that will help us better
>> >> shape our processing time timer handling?
>> >>
>> >> Kostas
>> >>
>> >> On Mon, Aug 17, 2020 at 2:52 PM David Anderson <[hidden email]> wrote:
>> >> >
>> >> > Kostas,
>> >> >
>> >> > I'm pleased to see some concrete details in this FLIP.
>> >> >
>> >> > I wonder if the current proposal goes far enough in the direction of recognizing the need some users may have for "batch" and "bounded streaming" to be treated differently. If I've understood it correctly, the section on scheduling allows me to choose STREAMING scheduling even if I have bounded sources. I like that approach, because it recognizes that even though I have bounded inputs, I don't necessarily want batch processing semantics. I think it makes sense to extend this idea to processing time support as well.
>> >> >
>> >> > My thinking is that sometimes in development and testing it's reasonable to run exactly the same job as in production, except with different sources and sinks. While it might be a reasonable default, I'm not convinced that switching a processing time streaming job to read from a bounded source should always cause it to fail.
>> >> >
>> >> > David
>> >> >
>> >> > On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas <[hidden email]> wrote:
>> >> >>
>> >> >> Hi all,
>> >> >>
>> >> >> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
>> >> >> API in favour of the DataStream API and the Table API. After this work
>> >> >> is done, the user will be able to write a program using the DataStream
>> >> >> API and this will execute efficiently on both bounded and unbounded
>> >> >> data. But before we reach this point, it is worth discussing and
>> >> >> agreeing on the semantics of some operations as we transition from the
>> >> >> streaming world to the batch one.
>> >> >>
>> >> >> This thread and the associated FLIP [2] aim at discussing these issues
>> >> >> as these topics are pretty important to users and can lead to
>> >> >> unpleasant surprises if we do not pay attention.
>> >> >>
>> >> >> Let's have a healthy discussion here and I will be updating the FLIP
>> >> >> accordingly.
>> >> >>
>> >> >> Cheers,
>> >> >> Kostas
>> >> >>
>> >> >> [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>> >> >> [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

Kostas Kloudas-5
Thanks a lot for the discussion!

I will open a voting thread shortly!

Kostas

On Mon, Aug 24, 2020 at 9:46 AM Kostas Kloudas <[hidden email]> wrote:

>
> Hi Guowei,
>
> Thanks for the insightful comment!
>
> I agree that this can be a limitation of the current runtime, but I
> think that this FLIP can go on as it discusses mainly the semantics
> that the DataStream API will expose when applied on bounded data.
> There will definitely be other FLIPs that will actually handle the
> runtime-related topics.
>
> But it is good to document them nevertheless so that we start soon
> ironing out the remaining rough edges.
>
> Cheers,
> Kostas
>
> On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma <[hidden email]> wrote:
> >
> > Hi, Klou
> >
> > Thanks for your proposal. It's a very good idea.
> > Just a little comment about the "Batch vs Streaming Scheduling".  In the AUTOMATIC execution mode maybe we could not pick BATCH execution mode even if all sources are bounded. For example some applications would use the `CheckpointListener`, which is not available in the BATCH mode in current implementation.
> > So maybe we need more checks in the AUTOMATIC execution mode.
> >
> > Best,
> > Guowei
> >
> >
> > On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas <[hidden email]> wrote:
> >>
> >> Hi all,
> >>
> >> Thanks for the comments!
> >>
> >> @Dawid: "execution.mode" can be a nice alternative and from a quick
> >> look it is not used currently by any configuration option. I will
> >> update the FLIP accordingly.
> >>
> >> @David: Given that having the option to allow timers to fire at the
> >> end of the job is already in the FLIP, I will leave it as is and I
> >> will update the default policy to be "ignore processing time timers
> >> set by the user". This will allow existing dataStream programs to run
> >> on bounded inputs. This update will affect point 2 in the "Processing
> >> Time Support in Batch" section.
> >>
> >> If these changes cover your proposals, then I would like to start a
> >> voting thread tomorrow evening if this is ok with you.
> >>
> >> Please let me know until then.
> >>
> >> Kostas
> >>
> >> On Tue, Aug 18, 2020 at 3:54 PM David Anderson <[hidden email]> wrote:
> >> >
> >> > Being able to optionally fire registered processing time timers at the end of a job would be interesting, and would help in (at least some of) the cases I have in mind. I don't have a better idea.
> >> >
> >> > David
> >> >
> >> > On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas <[hidden email]> wrote:
> >> >>
> >> >> Hi Kurt and David,
> >> >>
> >> >> Thanks a lot for the insightful feedback!
> >> >>
> >> >> @Kurt: For the topic of checkpointing with Batch Scheduling, I totally
> >> >> agree with you that it requires a lot more work and careful thinking
> >> >> on the semantics. This FLIP was written under the assumption that if
> >> >> the user wants to have checkpoints on bounded input, he/she will have
> >> >> to go with STREAMING as the scheduling mode. Checkpointing for BATCH
> >> >> can be handled as a separate topic in the future.
> >> >>
> >> >> In the case of MIXED workloads and for this FLIP, the scheduling mode
> >> >> should be set to STREAMING. That is why the AUTOMATIC option sets
> >> >> scheduling to BATCH only if all the sources are bounded. I am not sure
> >> >> what are the plans there at the scheduling level, as one could imagine
> >> >> in the future that in mixed workloads, we schedule first all the
> >> >> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
> >> >> subgraph per application, which is going to be scheduled after all
> >> >> Bounded ones have finished. Essentially the bounded subgraphs will be
> >> >> used to bootstrap the unbounded one. But, I am not aware of any plans
> >> >> towards that direction.
> >> >>
> >> >>
> >> >> @David: The processing time timer handling is a topic that has also
> >> >> been discussed in the community in the past, and I do not remember any
> >> >> final conclusion unfortunately.
> >> >>
> >> >> In the current context and for bounded input, we chose to favor
> >> >> reproducibility of the result, as this is expected in batch processing
> >> >> where the whole input is available in advance. This is why this
> >> >> proposal suggests to not allow processing time timers. But I
> >> >> understand your argument that the user may want to be able to run the
> >> >> same pipeline on batch and streaming this is why we added the two
> >> >> options under future work, namely (from the FLIP):
> >> >>
> >> >> ```
> >> >> Future Work: In the future we may consider adding as options the capability of:
> >> >> * firing all the registered processing time timers at the end of a job
> >> >> (at close()) or,
> >> >> * ignoring all the registered processing time timers at the end of a job.
> >> >> ```
> >> >>
> >> >> Conceptually, we are essentially saying that we assume that batch
> >> >> execution is assumed to be instantaneous and refers to a single
> >> >> "point" in time and any processing-time timers for the future may fire
> >> >> at the end of execution or be ignored (but not throw an exception). I
> >> >> could also see ignoring the timers in batch as the default, if this
> >> >> makes more sense.
> >> >>
> >> >> By the way, do you have any usecases in mind that will help us better
> >> >> shape our processing time timer handling?
> >> >>
> >> >> Kostas
> >> >>
> >> >> On Mon, Aug 17, 2020 at 2:52 PM David Anderson <[hidden email]> wrote:
> >> >> >
> >> >> > Kostas,
> >> >> >
> >> >> > I'm pleased to see some concrete details in this FLIP.
> >> >> >
> >> >> > I wonder if the current proposal goes far enough in the direction of recognizing the need some users may have for "batch" and "bounded streaming" to be treated differently. If I've understood it correctly, the section on scheduling allows me to choose STREAMING scheduling even if I have bounded sources. I like that approach, because it recognizes that even though I have bounded inputs, I don't necessarily want batch processing semantics. I think it makes sense to extend this idea to processing time support as well.
> >> >> >
> >> >> > My thinking is that sometimes in development and testing it's reasonable to run exactly the same job as in production, except with different sources and sinks. While it might be a reasonable default, I'm not convinced that switching a processing time streaming job to read from a bounded source should always cause it to fail.
> >> >> >
> >> >> > David
> >> >> >
> >> >> > On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas <[hidden email]> wrote:
> >> >> >>
> >> >> >> Hi all,
> >> >> >>
> >> >> >> As described in FLIP-131 [1], we are aiming at deprecating the DataSet
> >> >> >> API in favour of the DataStream API and the Table API. After this work
> >> >> >> is done, the user will be able to write a program using the DataStream
> >> >> >> API and this will execute efficiently on both bounded and unbounded
> >> >> >> data. But before we reach this point, it is worth discussing and
> >> >> >> agreeing on the semantics of some operations as we transition from the
> >> >> >> streaming world to the batch one.
> >> >> >>
> >> >> >> This thread and the associated FLIP [2] aim at discussing these issues
> >> >> >> as these topics are pretty important to users and can lead to
> >> >> >> unpleasant surprises if we do not pay attention.
> >> >> >>
> >> >> >> Let's have a healthy discussion here and I will be updating the FLIP
> >> >> >> accordingly.
> >> >> >>
> >> >> >> Cheers,
> >> >> >> Kostas
> >> >> >>
> >> >> >> [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
> >> >> >> [2] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

Aljoscha Krettek-2
Hmm, it seems I left out the Dev ML in my mail. Looping that back in..


On 28.08.20 13:54, Dawid Wysakowicz wrote:

> @Aljoscha Let me bring back to the ML some of the points we discussed
> offline.
>
> Ad. 1 Yes I agree it's not just about scheduling. It includes more
> changes to the runtime. We might need to make it more prominent in the
> write up.
>
> Ad. 2 You have a good point here that switching the default value for
> TimeCharacteristic to INGESTION time might not be the best option as it
> might hide problems if we assign ingestion time, which is rarely a right
> choice for user programs. Maybe we could just go with the EVENT_TIME as
> the default?
>
> Ad. 4 That's a very good point! I do agree with you it would be better
> to change the behaviour of said methods for batch-style execution. Even
> though it changes the behaviour, the overall logic is still correct.
> Moreover I'd also recommend deprecating some of the relational-like
> methods, which we should rather redirect to the Table API. I added a
> section about it to the FLIP (mostly copying over your message). Let me
> know what you think about it.
>
> Best,
>
> Dawid
>
> On 25/08/2020 11:39, Aljoscha Krettek wrote:
>> Thanks for creating this FLIP! I think the general direction is very
>> good but I think there are some specifics that we should also put in
>> there and that we may need to discuss here as well.
>>
>> ## About batch vs streaming scheduling
>>
>> I think we shouldn't call it "scheduling", because the decision
>> between bounded and unbounded affects more than just scheduling. It
>> affects how we do network transfers and the semantics of time, among
>> other things. So maybe we should differentiate between batch-style and
>> streaming-style execution, though I'm not sure I like those terms either.
>>
>> ## About processing-time support in batch
>>
>> It's not just about "batch" changing the default to ingestion time is
>> a change for stream processing as well. Actually, I don't know if
>> ingestion time even makes sense for batch processing. IIRC, with the
>> new sources we actually always have a timestamp, so this discussion
>> might be moot. Maybe Becket and/or Stephan (cc'ed) could chime in on
>> this.
>>
>> Also, I think it's right that we currently ignore processing-time
>> timers at the end of input in streaming jobs, but this has been a
>> source of trouble for users. See [1] and several discussions on the
>> ML. I'm also cc'ing Flavio here who also ran into this problem. I
>> think we should solve this quickly after laying the foundations of
>> bounded processing on the DataStream API.
>>
>> ## About broadcast state support
>>
>> I think as a low-hanging fruit we could just read the broadcast side
>> first and then switch to the regular input. We do need to be careful
>> with creating distributed deadlocks, though, so this might be trickier
>> than it seems at first.
>>
>> ## Loose ends and weird semantics
>>
>> There are some operations in the DataStream API that have semantics
>> that might make sense for stream processing but should behave
>> differently for batch. For example, KeyedStream.reduce() is
>> essentially a reduce on a GlobalWindow with a Trigger that fires on
>> every element. In DB terms it produces an UPSERT stream as an output,
>> if you get ten input elements for a key you also get ten output
>> records. For batch processing it might make more sense to instead only
>> produce one output record per key with the result of the aggregation.
>> This would be correct for downstream consumers that expect an UPSERT
>> stream but it would change the actual physical output stream that they
>> see.
>>
>> There might be other such operations in the DataStream API that have
>> slightly weird behaviour that doesn't make much sense when you do
>> bounded processing.
>>
>> Best,
>> Aljoscha
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-18647
>>
>> On 24.08.20 11:29, Kostas Kloudas wrote:
>>> Thanks a lot for the discussion!
>>>
>>> I will open a voting thread shortly!
>>>
>>> Kostas
>>>
>>> On Mon, Aug 24, 2020 at 9:46 AM Kostas Kloudas <[hidden email]>
>>> wrote:
>>>>
>>>> Hi Guowei,
>>>>
>>>> Thanks for the insightful comment!
>>>>
>>>> I agree that this can be a limitation of the current runtime, but I
>>>> think that this FLIP can go on as it discusses mainly the semantics
>>>> that the DataStream API will expose when applied on bounded data.
>>>> There will definitely be other FLIPs that will actually handle the
>>>> runtime-related topics.
>>>>
>>>> But it is good to document them nevertheless so that we start soon
>>>> ironing out the remaining rough edges.
>>>>
>>>> Cheers,
>>>> Kostas
>>>>
>>>> On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma <[hidden email]> wrote:
>>>>>
>>>>> Hi, Klou
>>>>>
>>>>> Thanks for your proposal. It's a very good idea.
>>>>> Just a little comment about the "Batch vs Streaming Scheduling".
>>>>> In the AUTOMATIC execution mode maybe we could not pick BATCH
>>>>> execution mode even if all sources are bounded. For example some
>>>>> applications would use the `CheckpointListener`, which is not
>>>>> available in the BATCH mode in current implementation.
>>>>> So maybe we need more checks in the AUTOMATIC execution mode.
>>>>>
>>>>> Best,
>>>>> Guowei
>>>>>
>>>>>
>>>>> On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas
>>>>> <[hidden email]> wrote:
>>>>>>
>>>>>> Hi all,
>>>>>>
>>>>>> Thanks for the comments!
>>>>>>
>>>>>> @Dawid: "execution.mode" can be a nice alternative and from a quick
>>>>>> look it is not used currently by any configuration option. I will
>>>>>> update the FLIP accordingly.
>>>>>>
>>>>>> @David: Given that having the option to allow timers to fire at the
>>>>>> end of the job is already in the FLIP, I will leave it as is and I
>>>>>> will update the default policy to be "ignore processing time timers
>>>>>> set by the user". This will allow existing dataStream programs to run
>>>>>> on bounded inputs. This update will affect point 2 in the "Processing
>>>>>> Time Support in Batch" section.
>>>>>>
>>>>>> If these changes cover your proposals, then I would like to start a
>>>>>> voting thread tomorrow evening if this is ok with you.
>>>>>>
>>>>>> Please let me know until then.
>>>>>>
>>>>>> Kostas
>>>>>>
>>>>>> On Tue, Aug 18, 2020 at 3:54 PM David Anderson
>>>>>> <[hidden email]> wrote:
>>>>>>>
>>>>>>> Being able to optionally fire registered processing time timers
>>>>>>> at the end of a job would be interesting, and would help in (at
>>>>>>> least some of) the cases I have in mind. I don't have a better idea.
>>>>>>>
>>>>>>> David
>>>>>>>
>>>>>>> On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas
>>>>>>> <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Hi Kurt and David,
>>>>>>>>
>>>>>>>> Thanks a lot for the insightful feedback!
>>>>>>>>
>>>>>>>> @Kurt: For the topic of checkpointing with Batch Scheduling, I
>>>>>>>> totally
>>>>>>>> agree with you that it requires a lot more work and careful
>>>>>>>> thinking
>>>>>>>> on the semantics. This FLIP was written under the assumption
>>>>>>>> that if
>>>>>>>> the user wants to have checkpoints on bounded input, he/she will
>>>>>>>> have
>>>>>>>> to go with STREAMING as the scheduling mode. Checkpointing for
>>>>>>>> BATCH
>>>>>>>> can be handled as a separate topic in the future.
>>>>>>>>
>>>>>>>> In the case of MIXED workloads and for this FLIP, the scheduling
>>>>>>>> mode
>>>>>>>> should be set to STREAMING. That is why the AUTOMATIC option sets
>>>>>>>> scheduling to BATCH only if all the sources are bounded. I am
>>>>>>>> not sure
>>>>>>>> what are the plans there at the scheduling level, as one could
>>>>>>>> imagine
>>>>>>>> in the future that in mixed workloads, we schedule first all the
>>>>>>>> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
>>>>>>>> subgraph per application, which is going to be scheduled after all
>>>>>>>> Bounded ones have finished. Essentially the bounded subgraphs
>>>>>>>> will be
>>>>>>>> used to bootstrap the unbounded one. But, I am not aware of any
>>>>>>>> plans
>>>>>>>> towards that direction.
>>>>>>>>
>>>>>>>>
>>>>>>>> @David: The processing time timer handling is a topic that has also
>>>>>>>> been discussed in the community in the past, and I do not
>>>>>>>> remember any
>>>>>>>> final conclusion unfortunately.
>>>>>>>>
>>>>>>>> In the current context and for bounded input, we chose to favor
>>>>>>>> reproducibility of the result, as this is expected in batch
>>>>>>>> processing
>>>>>>>> where the whole input is available in advance. This is why this
>>>>>>>> proposal suggests to not allow processing time timers. But I
>>>>>>>> understand your argument that the user may want to be able to
>>>>>>>> run the
>>>>>>>> same pipeline on batch and streaming this is why we added the two
>>>>>>>> options under future work, namely (from the FLIP):
>>>>>>>>
>>>>>>>> ```
>>>>>>>> Future Work: In the future we may consider adding as options the
>>>>>>>> capability of:
>>>>>>>> * firing all the registered processing time timers at the end of
>>>>>>>> a job
>>>>>>>> (at close()) or,
>>>>>>>> * ignoring all the registered processing time timers at the end
>>>>>>>> of a job.
>>>>>>>> ```
>>>>>>>>
>>>>>>>> Conceptually, we are essentially saying that we assume that batch
>>>>>>>> execution is assumed to be instantaneous and refers to a single
>>>>>>>> "point" in time and any processing-time timers for the future
>>>>>>>> may fire
>>>>>>>> at the end of execution or be ignored (but not throw an
>>>>>>>> exception). I
>>>>>>>> could also see ignoring the timers in batch as the default, if this
>>>>>>>> makes more sense.
>>>>>>>>
>>>>>>>> By the way, do you have any usecases in mind that will help us
>>>>>>>> better
>>>>>>>> shape our processing time timer handling?
>>>>>>>>
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Mon, Aug 17, 2020 at 2:52 PM David Anderson
>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> Kostas,
>>>>>>>>>
>>>>>>>>> I'm pleased to see some concrete details in this FLIP.
>>>>>>>>>
>>>>>>>>> I wonder if the current proposal goes far enough in the
>>>>>>>>> direction of recognizing the need some users may have for
>>>>>>>>> "batch" and "bounded streaming" to be treated differently. If
>>>>>>>>> I've understood it correctly, the section on scheduling allows
>>>>>>>>> me to choose STREAMING scheduling even if I have bounded
>>>>>>>>> sources. I like that approach, because it recognizes that even
>>>>>>>>> though I have bounded inputs, I don't necessarily want batch
>>>>>>>>> processing semantics. I think it makes sense to extend this
>>>>>>>>> idea to processing time support as well.
>>>>>>>>>
>>>>>>>>> My thinking is that sometimes in development and testing it's
>>>>>>>>> reasonable to run exactly the same job as in production, except
>>>>>>>>> with different sources and sinks. While it might be a
>>>>>>>>> reasonable default, I'm not convinced that switching a
>>>>>>>>> processing time streaming job to read from a bounded source
>>>>>>>>> should always cause it to fail.
>>>>>>>>>
>>>>>>>>> David
>>>>>>>>>
>>>>>>>>> On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas
>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> As described in FLIP-131 [1], we are aiming at deprecating the
>>>>>>>>>> DataSet
>>>>>>>>>> API in favour of the DataStream API and the Table API. After
>>>>>>>>>> this work
>>>>>>>>>> is done, the user will be able to write a program using the
>>>>>>>>>> DataStream
>>>>>>>>>> API and this will execute efficiently on both bounded and
>>>>>>>>>> unbounded
>>>>>>>>>> data. But before we reach this point, it is worth discussing and
>>>>>>>>>> agreeing on the semantics of some operations as we transition
>>>>>>>>>> from the
>>>>>>>>>> streaming world to the batch one.
>>>>>>>>>>
>>>>>>>>>> This thread and the associated FLIP [2] aim at discussing
>>>>>>>>>> these issues
>>>>>>>>>> as these topics are pretty important to users and can lead to
>>>>>>>>>> unpleasant surprises if we do not pay attention.
>>>>>>>>>>
>>>>>>>>>> Let's have a healthy discussion here and I will be updating
>>>>>>>>>> the FLIP
>>>>>>>>>> accordingly.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Kostas
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>>>>>>>>>> [2]
>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

dwysakowicz
Hey Aljoscha

A couple of thoughts for the two remaining TODOs in the doc:

# Processing Time Support in BATCH/BOUNDED execution mode

I think there are two somewhat orthogonal problems around this topic:
ย ย ย  1. Firing processing timers at the end of the job
ย ย ย  2. Having processing timers in the BATCH mode
The way I see it there are three main use cases for different
combinations of the aforementioned dimensions:
ย ย ย  1. Regular streaming jobs: STREAM mode with UNBOUNDED sources
ย ย ย  ย ย  - we do want to have processing timers
ย ย ย  ย ย  - there is no end of the job
ย ย ย  2. Debugging/Testing streaming jobs: STREAM mode with BOUNDED sources
ย ย ย  ย ย  - we do want to have processing timers
ย ย ย  ย ย  - we want to fire/wait for the timers at the end
ย ย ย  3. batch jobs with DataStream API:
ย ย ย  ย ย  - we do **NOT** want to have processing timers either during
processing or at the end. We want to either fail-hard or ignore the
timers. Generally speaking, in BATCH mode the processing timers do not
make sense, therefore it would be better to fail-hard. It would be the
safest option, as some of the user logic might depend on the processing
timers. Failing hard would give the user opportunity to react to the
changed behaviour. On the other hand if we want to make it possible to
run exact same program both in STREAM and BATCH mode we must have an
option to simply ignore processing ย ย ย  timers.
ย ย ย  ย ย  - we never want to actually trigger the timers. Neither during
runtime nor at the end

Having the above in mind, I am thinking if we should introduce two
separate options:
ย ย ย ย  * processing-time.timers = ENABLE/FAIL/IGNORE
ย ย ย ย  * processing-time.on-end-of-input = CANCEL/WAIT/TRIGGER
With the two options we can satisfy all the above cases. The default
settings would be:
STREAM:
ย ย ย  ย  processing-time.timers = ENABLE
ย ย ย  ย  processing-time.on-end-of-input = TRIGGER
BATCH:
ย ย ย ย  processing-time.timers = IGNORE
ย ย ย ย  processing-time.on-end-of-input = CANCEL

# Event time triggers
I do say that from the implementation perspective, but I find it hard to
actually ignore the event-time triggers. We would have to adjust the
implementation of WindowOperator to do that. At the same time I see no
problem with simply keeping them working. I am wondering if we
should/could just leave them as they are.

# Broadcast State
As far as I am concerned there are no core semantical problems with the
Broadcast State. As of now, it does not give any guarantees about the
order in which the broadcast and non-broadcast sides are executed even
in streaming. It also does not expose any mechanisms to implement an
event/processing-time alignments (you cannot register timers in the
broadcast side). I can't see any of the guarantees breaking in the BATCH
mode.
I do agree it would give somewhat nicer properties in BATCH if we
consumed the broadcast side first. It would make the operation
deterministic and let users implement a broadcast join properly on top
of this method. Nevertheless I see it as an extension of the DataStream
API for BATCH execution rather than making the DataStream API work for
BATCH.ย  Therefore I'd be fine with the leaving the Broadcast State out
of the FLIP

What do you think?

On 01/09/2020 13:46, Aljoscha Krettek wrote:

> Hmm, it seems I left out the Dev ML in my mail. Looping that back in..
>
>
> On 28.08.20 13:54, Dawid Wysakowicz wrote:
>> @Aljoscha Let me bring back to the ML some of the points we discussed
>> offline.
>>
>> Ad. 1 Yes I agree it's not just about scheduling. It includes more
>> changes to the runtime. We might need to make it more prominent in the
>> write up.
>>
>> Ad. 2 You have a good point here that switching the default value for
>> TimeCharacteristic to INGESTION time might not be the best option as it
>> might hide problems if we assign ingestion time, which is rarely a right
>> choice for user programs. Maybe we could just go with the EVENT_TIME as
>> the default?
>>
>> Ad. 4 That's a very good point! I do agree with you it would be better
>> to change the behaviour of said methods for batch-style execution. Even
>> though it changes the behaviour, the overall logic is still correct.
>> Moreover I'd also recommend deprecating some of the relational-like
>> methods, which we should rather redirect to the Table API. I added a
>> section about it to the FLIP (mostly copying over your message). Let me
>> know what you think about it.
>>
>> Best,
>>
>> Dawid
>>
>> On 25/08/2020 11:39, Aljoscha Krettek wrote:
>>> Thanks for creating this FLIP! I think the general direction is very
>>> good but I think there are some specifics that we should also put in
>>> there and that we may need to discuss here as well.
>>>
>>> ## About batch vs streaming scheduling
>>>
>>> I think we shouldn't call it "scheduling", because the decision
>>> between bounded and unbounded affects more than just scheduling. It
>>> affects how we do network transfers and the semantics of time, among
>>> other things. So maybe we should differentiate between batch-style and
>>> streaming-style execution, though I'm not sure I like those terms
>>> either.
>>>
>>> ## About processing-time support in batch
>>>
>>> It's not just about "batch" changing the default to ingestion time is
>>> a change for stream processing as well. Actually, I don't know if
>>> ingestion time even makes sense for batch processing. IIRC, with the
>>> new sources we actually always have a timestamp, so this discussion
>>> might be moot. Maybe Becket and/or Stephan (cc'ed) could chime in on
>>> this.
>>>
>>> Also, I think it's right that we currently ignore processing-time
>>> timers at the end of input in streaming jobs, but this has been a
>>> source of trouble for users. See [1] and several discussions on the
>>> ML. I'm also cc'ing Flavio here who also ran into this problem. I
>>> think we should solve this quickly after laying the foundations of
>>> bounded processing on the DataStream API.
>>>
>>> ## About broadcast state support
>>>
>>> I think as a low-hanging fruit we could just read the broadcast side
>>> first and then switch to the regular input. We do need to be careful
>>> with creating distributed deadlocks, though, so this might be trickier
>>> than it seems at first.
>>>
>>> ## Loose ends and weird semantics
>>>
>>> There are some operations in the DataStream API that have semantics
>>> that might make sense for stream processing but should behave
>>> differently for batch. For example, KeyedStream.reduce() is
>>> essentially a reduce on a GlobalWindow with a Trigger that fires on
>>> every element. In DB terms it produces an UPSERT stream as an output,
>>> if you get ten input elements for a key you also get ten output
>>> records. For batch processing it might make more sense to instead only
>>> produce one output record per key with the result of the aggregation.
>>> This would be correct for downstream consumers that expect an UPSERT
>>> stream but it would change the actual physical output stream that they
>>> see.
>>>
>>> There might be other such operations in the DataStream API that have
>>> slightly weird behaviour that doesn't make much sense when you do
>>> bounded processing.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-18647
>>>
>>> On 24.08.20 11:29, Kostas Kloudas wrote:
>>>> Thanks a lot for the discussion!
>>>>
>>>> I will open a voting thread shortly!
>>>>
>>>> Kostas
>>>>
>>>> On Mon, Aug 24, 2020 at 9:46 AM Kostas Kloudas <[hidden email]>
>>>> wrote:
>>>>>
>>>>> Hi Guowei,
>>>>>
>>>>> Thanks for the insightful comment!
>>>>>
>>>>> I agree that this can be a limitation of the current runtime, but I
>>>>> think that this FLIP can go on as it discusses mainly the semantics
>>>>> that the DataStream API will expose when applied on bounded data.
>>>>> There will definitely be other FLIPs that will actually handle the
>>>>> runtime-related topics.
>>>>>
>>>>> But it is good to document them nevertheless so that we start soon
>>>>> ironing out the remaining rough edges.
>>>>>
>>>>> Cheers,
>>>>> Kostas
>>>>>
>>>>> On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Hi, Klou
>>>>>>
>>>>>> Thanks for your proposal. It's a very good idea.
>>>>>> Just a little comment about the "Batch vs Streaming Scheduling".
>>>>>> In the AUTOMATIC execution mode maybe we could not pick BATCH
>>>>>> execution mode even if all sources are bounded. For example some
>>>>>> applications would use the `CheckpointListener`, which is not
>>>>>> available in the BATCH mode in current implementation.
>>>>>> So maybe we need more checks in the AUTOMATIC execution mode.
>>>>>>
>>>>>> Best,
>>>>>> Guowei
>>>>>>
>>>>>>
>>>>>> On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas
>>>>>> <[hidden email]> wrote:
>>>>>>>
>>>>>>> Hi all,
>>>>>>>
>>>>>>> Thanks for the comments!
>>>>>>>
>>>>>>> @Dawid: "execution.mode" can be a nice alternative and from a quick
>>>>>>> look it is not used currently by any configuration option. I will
>>>>>>> update the FLIP accordingly.
>>>>>>>
>>>>>>> @David: Given that having the option to allow timers to fire at the
>>>>>>> end of the job is already in the FLIP, I will leave it as is and I
>>>>>>> will update the default policy to be "ignore processing time timers
>>>>>>> set by the user". This will allow existing dataStream programs
>>>>>>> to run
>>>>>>> on bounded inputs. This update will affect point 2 in the
>>>>>>> "Processing
>>>>>>> Time Support in Batch" section.
>>>>>>>
>>>>>>> If these changes cover your proposals, then I would like to start a
>>>>>>> voting thread tomorrow evening if this is ok with you.
>>>>>>>
>>>>>>> Please let me know until then.
>>>>>>>
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Tue, Aug 18, 2020 at 3:54 PM David Anderson
>>>>>>> <[hidden email]> wrote:
>>>>>>>>
>>>>>>>> Being able to optionally fire registered processing time timers
>>>>>>>> at the end of a job would be interesting, and would help in (at
>>>>>>>> least some of) the cases I have in mind. I don't have a better
>>>>>>>> idea.
>>>>>>>>
>>>>>>>> David
>>>>>>>>
>>>>>>>> On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas
>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>
>>>>>>>>> Hi Kurt and David,
>>>>>>>>>
>>>>>>>>> Thanks a lot for the insightful feedback!
>>>>>>>>>
>>>>>>>>> @Kurt: For the topic of checkpointing with Batch Scheduling, I
>>>>>>>>> totally
>>>>>>>>> agree with you that it requires a lot more work and careful
>>>>>>>>> thinking
>>>>>>>>> on the semantics. This FLIP was written under the assumption
>>>>>>>>> that if
>>>>>>>>> the user wants to have checkpoints on bounded input, he/she will
>>>>>>>>> have
>>>>>>>>> to go with STREAMING as the scheduling mode. Checkpointing for
>>>>>>>>> BATCH
>>>>>>>>> can be handled as a separate topic in the future.
>>>>>>>>>
>>>>>>>>> In the case of MIXED workloads and for this FLIP, the scheduling
>>>>>>>>> mode
>>>>>>>>> should be set to STREAMING. That is why the AUTOMATIC option sets
>>>>>>>>> scheduling to BATCH only if all the sources are bounded. I am
>>>>>>>>> not sure
>>>>>>>>> what are the plans there at the scheduling level, as one could
>>>>>>>>> imagine
>>>>>>>>> in the future that in mixed workloads, we schedule first all the
>>>>>>>>> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
>>>>>>>>> subgraph per application, which is going to be scheduled after
>>>>>>>>> all
>>>>>>>>> Bounded ones have finished. Essentially the bounded subgraphs
>>>>>>>>> will be
>>>>>>>>> used to bootstrap the unbounded one. But, I am not aware of any
>>>>>>>>> plans
>>>>>>>>> towards that direction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> @David: The processing time timer handling is a topic that has
>>>>>>>>> also
>>>>>>>>> been discussed in the community in the past, and I do not
>>>>>>>>> remember any
>>>>>>>>> final conclusion unfortunately.
>>>>>>>>>
>>>>>>>>> In the current context and for bounded input, we chose to favor
>>>>>>>>> reproducibility of the result, as this is expected in batch
>>>>>>>>> processing
>>>>>>>>> where the whole input is available in advance. This is why this
>>>>>>>>> proposal suggests to not allow processing time timers. But I
>>>>>>>>> understand your argument that the user may want to be able to
>>>>>>>>> run the
>>>>>>>>> same pipeline on batch and streaming this is why we added the two
>>>>>>>>> options under future work, namely (from the FLIP):
>>>>>>>>>
>>>>>>>>> ```
>>>>>>>>> Future Work: In the future we may consider adding as options the
>>>>>>>>> capability of:
>>>>>>>>> * firing all the registered processing time timers at the end of
>>>>>>>>> a job
>>>>>>>>> (at close()) or,
>>>>>>>>> * ignoring all the registered processing time timers at the end
>>>>>>>>> of a job.
>>>>>>>>> ```
>>>>>>>>>
>>>>>>>>> Conceptually, we are essentially saying that we assume that batch
>>>>>>>>> execution is assumed to be instantaneous and refers to a single
>>>>>>>>> "point" in time and any processing-time timers for the future
>>>>>>>>> may fire
>>>>>>>>> at the end of execution or be ignored (but not throw an
>>>>>>>>> exception). I
>>>>>>>>> could also see ignoring the timers in batch as the default, if
>>>>>>>>> this
>>>>>>>>> makes more sense.
>>>>>>>>>
>>>>>>>>> By the way, do you have any usecases in mind that will help us
>>>>>>>>> better
>>>>>>>>> shape our processing time timer handling?
>>>>>>>>>
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> On Mon, Aug 17, 2020 at 2:52 PM David Anderson
>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>> Kostas,
>>>>>>>>>>
>>>>>>>>>> I'm pleased to see some concrete details in this FLIP.
>>>>>>>>>>
>>>>>>>>>> I wonder if the current proposal goes far enough in the
>>>>>>>>>> direction of recognizing the need some users may have for
>>>>>>>>>> "batch" and "bounded streaming" to be treated differently. If
>>>>>>>>>> I've understood it correctly, the section on scheduling allows
>>>>>>>>>> me to choose STREAMING scheduling even if I have bounded
>>>>>>>>>> sources. I like that approach, because it recognizes that even
>>>>>>>>>> though I have bounded inputs, I don't necessarily want batch
>>>>>>>>>> processing semantics. I think it makes sense to extend this
>>>>>>>>>> idea to processing time support as well.
>>>>>>>>>>
>>>>>>>>>> My thinking is that sometimes in development and testing it's
>>>>>>>>>> reasonable to run exactly the same job as in production, except
>>>>>>>>>> with different sources and sinks. While it might be a
>>>>>>>>>> reasonable default, I'm not convinced that switching a
>>>>>>>>>> processing time streaming job to read from a bounded source
>>>>>>>>>> should always cause it to fail.
>>>>>>>>>>
>>>>>>>>>> David
>>>>>>>>>>
>>>>>>>>>> On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas
>>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> As described in FLIP-131 [1], we are aiming at deprecating the
>>>>>>>>>>> DataSet
>>>>>>>>>>> API in favour of the DataStream API and the Table API. After
>>>>>>>>>>> this work
>>>>>>>>>>> is done, the user will be able to write a program using the
>>>>>>>>>>> DataStream
>>>>>>>>>>> API and this will execute efficiently on both bounded and
>>>>>>>>>>> unbounded
>>>>>>>>>>> data. But before we reach this point, it is worth discussing
>>>>>>>>>>> and
>>>>>>>>>>> agreeing on the semantics of some operations as we transition
>>>>>>>>>>> from the
>>>>>>>>>>> streaming world to the batch one.
>>>>>>>>>>>
>>>>>>>>>>> This thread and the associated FLIP [2] aim at discussing
>>>>>>>>>>> these issues
>>>>>>>>>>> as these topics are pretty important to users and can lead to
>>>>>>>>>>> unpleasant surprises if we do not pay attention.
>>>>>>>>>>>
>>>>>>>>>>> Let's have a healthy discussion here and I will be updating
>>>>>>>>>>> the FLIP
>>>>>>>>>>> accordingly.
>>>>>>>>>>>
>>>>>>>>>>> Cheers,
>>>>>>>>>>> Kostas
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>>>>>>>>>>>
>>>>>>>>>>> [2]
>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
>>>>>>>>>>>
>>>
>>
>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

Aljoscha Krettek-2
I updated the FLIP, you can check out the changes here:
https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158871522&selectedPageVersions=16&selectedPageVersions=15

There is still the open question of what IGNORE means for
getProcessingTime().

Plus, I introduced a setting for ignoring Triggers because I think it
otherwise doesn't work well with FAILing hard on processing-time API
calls. I described it in the FLIP, so please have a look at the diff I
linked above.

Aljoscha


On 08.09.20 11:35, Dawid Wysakowicz wrote:

>> The only one where I could see that users want different behaviour
>> BATCH jobs on the DataStream API. I agree that processing-time does
>> not make much sense in batch jobs. However, if users have written some
>> business logic using processing-time timers their jobs will silently
>> not work if we set the default to IGNORE. Setting it to FAIL would at
>> least make users aware that something is not right.
> I see your point. I was also undecided myself which option to use here.
> I went with IGNORE for the reason that I thought the common/the most
> prominent functions should work just out of the box without much
> additional tweaking. I found the case of "running the same program in
> BATCH and STREAM" one of such cases and therefore optimized the options
> for that case. That's why went with IGNORE instead of FAIL. Again I am
> good with either of the two.
>
>> I can also see a small group of users wanting processing-time timers
>> for BATCH. We could, for example, fire all processing-time timers at
>> the "end of input", then we also set the watermark to +Inf.
> I agree. I think this case would be covered with ENABLE + TRIGGER. I do
> agree though it makes sense to mention this case explicitly as the
> ENABLE option would behave slightly different in BATCH than in STREAM.
> Maybe not strictly speaking different, but would be worth explaining
> anyway. The way I heard from some people you can think of BATCH
> processing happening instantaneously in processing time. Therefore there
> can be no timers triggered in between records. In BATCH processing the
> only time when timers can be triggered is at the end of input. Or at
> least that is how I see it.
>
>> Another thing is: what should we do with new triggers that are set
>> after the end-of-input. If we have TRIGGER and users keep setting new
>> processing-time timers in the callback, would we continue firing them.
>> Or should the behaviour bee QUIESCE_AND_TRIGGER, where we work off
>> remaining timers but don't add new ones? Do we silently ignore adding
>> new ones?
> My take on this issue is that it should be good enough to have the
> QUIESCE_AND_TRIGGER behaviour with ignoring timers registered after the
> end of input. We can not fail hard in such scenario, unless we expose a
> flag saying the timer is after the end of input. Otherwise I can not see
> a way to correctly safe guard for this scenario. I can see some use
> cases that would benefit from allowing the timers registration, e.g.
> periodically checking if some external process finished. In my opinion
> this is a bit of a different topic, as it is actually an issue of
> inverting the control when an operator can finish. Right now it is the
> task that decides that the job/operator finishes at the end of input.
>
>> By the way, I assume WAIT means we wait for processing-time to
>> actually reach the time of pending timers? Or did you have something
>> else in mind with this?
> Yes, that's what I meant. I actually took the options from this
> issue[1], where there is some discussion on that topic as well.
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-18647
>
> On 08/09/2020 10:57, Aljoscha Krettek wrote:
>> I agree with almost all of your points!
>>
>> The only one where I could see that users want different behaviour
>> BATCH jobs on the DataStream API. I agree that processing-time does
>> not make much sense in batch jobs. However, if users have written some
>> business logic using processing-time timers their jobs will silently
>> not work if we set the default to IGNORE. Setting it to FAIL would at
>> least make users aware that something is not right.
>>
>> I can also see a small group of users wanting processing-time timers
>> for BATCH. We could, for example, fire all processing-time timers at
>> the "end of input", then we also set the watermark to +Inf.
>>
>> Another thing is: what should we do with new triggers that are set
>> after the end-of-input. If we have TRIGGER and users keep setting new
>> processing-time timers in the callback, would we continue firing them.
>> Or should the behaviour bee QUIESCE_AND_TRIGGER, where we work off
>> remaining timers but don't add new ones? Do we silently ignore adding
>> new ones?
>>
>> By the way, I assume WAIT means we wait for processing-time to
>> actually reach the time of pending timers? Or did you have something
>> else in mind with this?
>>
>> Aljoscha
>>
>> On 08.09.20 09:19, Dawid Wysakowicz wrote:
>>> Hey Aljoscha
>>>
>>> A couple of thoughts for the two remaining TODOs in the doc:
>>>
>>> # Processing Time Support in BATCH/BOUNDED execution mode
>>>
>>> I think there are two somewhat orthogonal problems around this topic:
>>>  ย ย ย ย  1. Firing processing timers at the end of the job
>>>  ย ย ย ย  2. Having processing timers in the BATCH mode
>>> The way I see it there are three main use cases for different
>>> combinations of the aforementioned dimensions:
>>>  ย ย ย ย  1. Regular streaming jobs: STREAM mode with UNBOUNDED sources
>>>  ย ย ย ย  ย ย  - we do want to have processing timers
>>>  ย ย ย ย  ย ย  - there is no end of the job
>>>  ย ย ย ย  2. Debugging/Testing streaming jobs: STREAM mode with BOUNDED
>>> sources
>>>  ย ย ย ย  ย ย  - we do want to have processing timers
>>>  ย ย ย ย  ย ย  - we want to fire/wait for the timers at the end
>>>  ย ย ย ย  3. batch jobs with DataStream API:
>>>  ย ย ย ย  ย ย  - we do **NOT** want to have processing timers either during
>>> processing or at the end. We want to either fail-hard or ignore the
>>> timers. Generally speaking, in BATCH mode the processing timers do not
>>> make sense, therefore it would be better to fail-hard. It would be the
>>> safest option, as some of the user logic might depend on the processing
>>> timers. Failing hard would give the user opportunity to react to the
>>> changed behaviour. On the other hand if we want to make it possible to
>>> run exact same program both in STREAM and BATCH mode we must have an
>>> option to simply ignore processing ย ย ย  timers.
>>>  ย ย ย ย  ย ย  - we never want to actually trigger the timers. Neither during
>>> runtime nor at the end
>>>
>>> Having the above in mind, I am thinking if we should introduce two
>>> separate options:
>>>  ย ย ย ย ย  * processing-time.timers = ENABLE/FAIL/IGNORE
>>>  ย ย ย ย ย  * processing-time.on-end-of-input = CANCEL/WAIT/TRIGGER
>>> With the two options we can satisfy all the above cases. The default
>>> settings would be:
>>> STREAM:
>>>  ย ย ย ย  ย  processing-time.timers = ENABLE
>>>  ย ย ย ย  ย  processing-time.on-end-of-input = TRIGGER
>>> BATCH:
>>>  ย ย ย ย ย  processing-time.timers = IGNORE
>>>  ย ย ย ย ย  processing-time.on-end-of-input = CANCEL
>>>
>>> # Event time triggers
>>> I do say that from the implementation perspective, but I find it hard to
>>> actually ignore the event-time triggers. We would have to adjust the
>>> implementation of WindowOperator to do that. At the same time I see no
>>> problem with simply keeping them working. I am wondering if we
>>> should/could just leave them as they are.
>>>
>>> # Broadcast State
>>> As far as I am concerned there are no core semantical problems with the
>>> Broadcast State. As of now, it does not give any guarantees about the
>>> order in which the broadcast and non-broadcast sides are executed even
>>> in streaming. It also does not expose any mechanisms to implement an
>>> event/processing-time alignments (you cannot register timers in the
>>> broadcast side). I can't see any of the guarantees breaking in the BATCH
>>> mode.
>>> I do agree it would give somewhat nicer properties in BATCH if we
>>> consumed the broadcast side first. It would make the operation
>>> deterministic and let users implement a broadcast join properly on top
>>> of this method. Nevertheless I see it as an extension of the DataStream
>>> API for BATCH execution rather than making the DataStream API work for
>>> BATCH.ย  Therefore I'd be fine with the leaving the Broadcast State out
>>> of the FLIP
>>>
>>> What do you think?
>>>
>>> On 01/09/2020 13:46, Aljoscha Krettek wrote:
>>>> Hmm, it seems I left out the Dev ML in my mail. Looping that back in..
>>>>
>>>>
>>>> On 28.08.20 13:54, Dawid Wysakowicz wrote:
>>>>> @Aljoscha Let me bring back to the ML some of the points we discussed
>>>>> offline.
>>>>>
>>>>> Ad. 1 Yes I agree it's not just about scheduling. It includes more
>>>>> changes to the runtime. We might need to make it more prominent in the
>>>>> write up.
>>>>>
>>>>> Ad. 2 You have a good point here that switching the default value for
>>>>> TimeCharacteristic to INGESTION time might not be the best option
>>>>> as it
>>>>> might hide problems if we assign ingestion time, which is rarely a
>>>>> right
>>>>> choice for user programs. Maybe we could just go with the
>>>>> EVENT_TIME as
>>>>> the default?
>>>>>
>>>>> Ad. 4 That's a very good point! I do agree with you it would be better
>>>>> to change the behaviour of said methods for batch-style execution.
>>>>> Even
>>>>> though it changes the behaviour, the overall logic is still correct.
>>>>> Moreover I'd also recommend deprecating some of the relational-like
>>>>> methods, which we should rather redirect to the Table API. I added a
>>>>> section about it to the FLIP (mostly copying over your message).
>>>>> Let me
>>>>> know what you think about it.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 25/08/2020 11:39, Aljoscha Krettek wrote:
>>>>>> Thanks for creating this FLIP! I think the general direction is very
>>>>>> good but I think there are some specifics that we should also put in
>>>>>> there and that we may need to discuss here as well.
>>>>>>
>>>>>> ## About batch vs streaming scheduling
>>>>>>
>>>>>> I think we shouldn't call it "scheduling", because the decision
>>>>>> between bounded and unbounded affects more than just scheduling. It
>>>>>> affects how we do network transfers and the semantics of time, among
>>>>>> other things. So maybe we should differentiate between batch-style
>>>>>> and
>>>>>> streaming-style execution, though I'm not sure I like those terms
>>>>>> either.
>>>>>>
>>>>>> ## About processing-time support in batch
>>>>>>
>>>>>> It's not just about "batch" changing the default to ingestion time is
>>>>>> a change for stream processing as well. Actually, I don't know if
>>>>>> ingestion time even makes sense for batch processing. IIRC, with the
>>>>>> new sources we actually always have a timestamp, so this discussion
>>>>>> might be moot. Maybe Becket and/or Stephan (cc'ed) could chime in on
>>>>>> this.
>>>>>>
>>>>>> Also, I think it's right that we currently ignore processing-time
>>>>>> timers at the end of input in streaming jobs, but this has been a
>>>>>> source of trouble for users. See [1] and several discussions on the
>>>>>> ML. I'm also cc'ing Flavio here who also ran into this problem. I
>>>>>> think we should solve this quickly after laying the foundations of
>>>>>> bounded processing on the DataStream API.
>>>>>>
>>>>>> ## About broadcast state support
>>>>>>
>>>>>> I think as a low-hanging fruit we could just read the broadcast side
>>>>>> first and then switch to the regular input. We do need to be careful
>>>>>> with creating distributed deadlocks, though, so this might be
>>>>>> trickier
>>>>>> than it seems at first.
>>>>>>
>>>>>> ## Loose ends and weird semantics
>>>>>>
>>>>>> There are some operations in the DataStream API that have semantics
>>>>>> that might make sense for stream processing but should behave
>>>>>> differently for batch. For example, KeyedStream.reduce() is
>>>>>> essentially a reduce on a GlobalWindow with a Trigger that fires on
>>>>>> every element. In DB terms it produces an UPSERT stream as an output,
>>>>>> if you get ten input elements for a key you also get ten output
>>>>>> records. For batch processing it might make more sense to instead
>>>>>> only
>>>>>> produce one output record per key with the result of the aggregation.
>>>>>> This would be correct for downstream consumers that expect an UPSERT
>>>>>> stream but it would change the actual physical output stream that
>>>>>> they
>>>>>> see.
>>>>>>
>>>>>> There might be other such operations in the DataStream API that have
>>>>>> slightly weird behaviour that doesn't make much sense when you do
>>>>>> bounded processing.
>>>>>>
>>>>>> Best,
>>>>>> Aljoscha
>>>>>>
>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-18647
>>>>>>
>>>>>> On 24.08.20 11:29, Kostas Kloudas wrote:
>>>>>>> Thanks a lot for the discussion!
>>>>>>>
>>>>>>> I will open a voting thread shortly!
>>>>>>>
>>>>>>> Kostas
>>>>>>>
>>>>>>> On Mon, Aug 24, 2020 at 9:46 AM Kostas Kloudas <[hidden email]>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi Guowei,
>>>>>>>>
>>>>>>>> Thanks for the insightful comment!
>>>>>>>>
>>>>>>>> I agree that this can be a limitation of the current runtime, but I
>>>>>>>> think that this FLIP can go on as it discusses mainly the semantics
>>>>>>>> that the DataStream API will expose when applied on bounded data.
>>>>>>>> There will definitely be other FLIPs that will actually handle the
>>>>>>>> runtime-related topics.
>>>>>>>>
>>>>>>>> But it is good to document them nevertheless so that we start soon
>>>>>>>> ironing out the remaining rough edges.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma <[hidden email]>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi, Klou
>>>>>>>>>
>>>>>>>>> Thanks for your proposal. It's a very good idea.
>>>>>>>>> Just a little comment about the "Batch vs Streaming Scheduling".
>>>>>>>>> In the AUTOMATIC execution mode maybe we could not pick BATCH
>>>>>>>>> execution mode even if all sources are bounded. For example some
>>>>>>>>> applications would use the `CheckpointListener`, which is not
>>>>>>>>> available in the BATCH mode in current implementation.
>>>>>>>>> So maybe we need more checks in the AUTOMATIC execution mode.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Guowei
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas
>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi all,
>>>>>>>>>>
>>>>>>>>>> Thanks for the comments!
>>>>>>>>>>
>>>>>>>>>> @Dawid: "execution.mode" can be a nice alternative and from a
>>>>>>>>>> quick
>>>>>>>>>> look it is not used currently by any configuration option. I will
>>>>>>>>>> update the FLIP accordingly.
>>>>>>>>>>
>>>>>>>>>> @David: Given that having the option to allow timers to fire
>>>>>>>>>> at the
>>>>>>>>>> end of the job is already in the FLIP, I will leave it as is
>>>>>>>>>> and I
>>>>>>>>>> will update the default policy to be "ignore processing time
>>>>>>>>>> timers
>>>>>>>>>> set by the user". This will allow existing dataStream programs
>>>>>>>>>> to run
>>>>>>>>>> on bounded inputs. This update will affect point 2 in the
>>>>>>>>>> "Processing
>>>>>>>>>> Time Support in Batch" section.
>>>>>>>>>>
>>>>>>>>>> If these changes cover your proposals, then I would like to
>>>>>>>>>> start a
>>>>>>>>>> voting thread tomorrow evening if this is ok with you.
>>>>>>>>>>
>>>>>>>>>> Please let me know until then.
>>>>>>>>>>
>>>>>>>>>> Kostas
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 18, 2020 at 3:54 PM David Anderson
>>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Being able to optionally fire registered processing time timers
>>>>>>>>>>> at the end of a job would be interesting, and would help in (at
>>>>>>>>>>> least some of) the cases I have in mind. I don't have a better
>>>>>>>>>>> idea.
>>>>>>>>>>>
>>>>>>>>>>> David
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas
>>>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Kurt and David,
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks a lot for the insightful feedback!
>>>>>>>>>>>>
>>>>>>>>>>>> @Kurt: For the topic of checkpointing with Batch Scheduling, I
>>>>>>>>>>>> totally
>>>>>>>>>>>> agree with you that it requires a lot more work and careful
>>>>>>>>>>>> thinking
>>>>>>>>>>>> on the semantics. This FLIP was written under the assumption
>>>>>>>>>>>> that if
>>>>>>>>>>>> the user wants to have checkpoints on bounded input, he/she
>>>>>>>>>>>> will
>>>>>>>>>>>> have
>>>>>>>>>>>> to go with STREAMING as the scheduling mode. Checkpointing for
>>>>>>>>>>>> BATCH
>>>>>>>>>>>> can be handled as a separate topic in the future.
>>>>>>>>>>>>
>>>>>>>>>>>> In the case of MIXED workloads and for this FLIP, the
>>>>>>>>>>>> scheduling
>>>>>>>>>>>> mode
>>>>>>>>>>>> should be set to STREAMING. That is why the AUTOMATIC option
>>>>>>>>>>>> sets
>>>>>>>>>>>> scheduling to BATCH only if all the sources are bounded. I am
>>>>>>>>>>>> not sure
>>>>>>>>>>>> what are the plans there at the scheduling level, as one could
>>>>>>>>>>>> imagine
>>>>>>>>>>>> in the future that in mixed workloads, we schedule first all
>>>>>>>>>>>> the
>>>>>>>>>>>> bounded subgraphs in BATCH mode and we allow only one UNBOUNDED
>>>>>>>>>>>> subgraph per application, which is going to be scheduled after
>>>>>>>>>>>> all
>>>>>>>>>>>> Bounded ones have finished. Essentially the bounded subgraphs
>>>>>>>>>>>> will be
>>>>>>>>>>>> used to bootstrap the unbounded one. But, I am not aware of any
>>>>>>>>>>>> plans
>>>>>>>>>>>> towards that direction.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> @David: The processing time timer handling is a topic that has
>>>>>>>>>>>> also
>>>>>>>>>>>> been discussed in the community in the past, and I do not
>>>>>>>>>>>> remember any
>>>>>>>>>>>> final conclusion unfortunately.
>>>>>>>>>>>>
>>>>>>>>>>>> In the current context and for bounded input, we chose to favor
>>>>>>>>>>>> reproducibility of the result, as this is expected in batch
>>>>>>>>>>>> processing
>>>>>>>>>>>> where the whole input is available in advance. This is why this
>>>>>>>>>>>> proposal suggests to not allow processing time timers. But I
>>>>>>>>>>>> understand your argument that the user may want to be able to
>>>>>>>>>>>> run the
>>>>>>>>>>>> same pipeline on batch and streaming this is why we added
>>>>>>>>>>>> the two
>>>>>>>>>>>> options under future work, namely (from the FLIP):
>>>>>>>>>>>>
>>>>>>>>>>>> ```
>>>>>>>>>>>> Future Work: In the future we may consider adding as options
>>>>>>>>>>>> the
>>>>>>>>>>>> capability of:
>>>>>>>>>>>> * firing all the registered processing time timers at the
>>>>>>>>>>>> end of
>>>>>>>>>>>> a job
>>>>>>>>>>>> (at close()) or,
>>>>>>>>>>>> * ignoring all the registered processing time timers at the end
>>>>>>>>>>>> of a job.
>>>>>>>>>>>> ```
>>>>>>>>>>>>
>>>>>>>>>>>> Conceptually, we are essentially saying that we assume that
>>>>>>>>>>>> batch
>>>>>>>>>>>> execution is assumed to be instantaneous and refers to a single
>>>>>>>>>>>> "point" in time and any processing-time timers for the future
>>>>>>>>>>>> may fire
>>>>>>>>>>>> at the end of execution or be ignored (but not throw an
>>>>>>>>>>>> exception). I
>>>>>>>>>>>> could also see ignoring the timers in batch as the default, if
>>>>>>>>>>>> this
>>>>>>>>>>>> makes more sense.
>>>>>>>>>>>>
>>>>>>>>>>>> By the way, do you have any usecases in mind that will help us
>>>>>>>>>>>> better
>>>>>>>>>>>> shape our processing time timer handling?
>>>>>>>>>>>>
>>>>>>>>>>>> Kostas
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Aug 17, 2020 at 2:52 PM David Anderson
>>>>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Kostas,
>>>>>>>>>>>>>
>>>>>>>>>>>>> I'm pleased to see some concrete details in this FLIP.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I wonder if the current proposal goes far enough in the
>>>>>>>>>>>>> direction of recognizing the need some users may have for
>>>>>>>>>>>>> "batch" and "bounded streaming" to be treated differently. If
>>>>>>>>>>>>> I've understood it correctly, the section on scheduling allows
>>>>>>>>>>>>> me to choose STREAMING scheduling even if I have bounded
>>>>>>>>>>>>> sources. I like that approach, because it recognizes that even
>>>>>>>>>>>>> though I have bounded inputs, I don't necessarily want batch
>>>>>>>>>>>>> processing semantics. I think it makes sense to extend this
>>>>>>>>>>>>> idea to processing time support as well.
>>>>>>>>>>>>>
>>>>>>>>>>>>> My thinking is that sometimes in development and testing it's
>>>>>>>>>>>>> reasonable to run exactly the same job as in production,
>>>>>>>>>>>>> except
>>>>>>>>>>>>> with different sources and sinks. While it might be a
>>>>>>>>>>>>> reasonable default, I'm not convinced that switching a
>>>>>>>>>>>>> processing time streaming job to read from a bounded source
>>>>>>>>>>>>> should always cause it to fail.
>>>>>>>>>>>>>
>>>>>>>>>>>>> David
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas
>>>>>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> As described in FLIP-131 [1], we are aiming at deprecating
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> DataSet
>>>>>>>>>>>>>> API in favour of the DataStream API and the Table API. After
>>>>>>>>>>>>>> this work
>>>>>>>>>>>>>> is done, the user will be able to write a program using the
>>>>>>>>>>>>>> DataStream
>>>>>>>>>>>>>> API and this will execute efficiently on both bounded and
>>>>>>>>>>>>>> unbounded
>>>>>>>>>>>>>> data. But before we reach this point, it is worth discussing
>>>>>>>>>>>>>> and
>>>>>>>>>>>>>> agreeing on the semantics of some operations as we transition
>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>> streaming world to the batch one.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This thread and the associated FLIP [2] aim at discussing
>>>>>>>>>>>>>> these issues
>>>>>>>>>>>>>> as these topics are pretty important to users and can lead to
>>>>>>>>>>>>>> unpleasant surprises if we do not pay attention.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Let's have a healthy discussion here and I will be updating
>>>>>>>>>>>>>> the FLIP
>>>>>>>>>>>>>> accordingly.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>> Kostas
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

dwysakowicz
Thanks for the update Aljoscha!

I am not sure about the option for ignoring the Triggers. Do you mean to
ignore all the Triggers including e.g. Flink's such as CountTrigger,
EventTimeTrigger etc.? Won't it effectively disable the WindowOperator
whatsoever. Or even worse make it unusable with ever growing state? I
might be wrong here but aren't Triggers required for emitting results
from WindowOperator? If I am correct we emit results only if a Trigger
returns FIRE from on of onElement, onEventTime, onProcessingTime. Why do
you think it does not work well with FAILing hard without this option?
We could fail hard e.g. if the WindowAssigner#isEventTime returns false.

As for the question with getProcessingTime(). From my point of view, it
would be safe to simply return the current system time. I cannot think
of any dangers if we do so. Moreover, frankly speaking I am not entirely
sure what is the purpose of the method, other than injecting a clock in
tests of built-in operators. Maybe it was a mistake to expose it in the
user's API?

Best,

Dawid

On 09/09/2020 14:12, Aljoscha Krettek wrote:

> I updated the FLIP, you can check out the changes here:
> https://cwiki.apache.org/confluence/pages/diffpagesbyversion.action?pageId=158871522&selectedPageVersions=16&selectedPageVersions=15
>
> There is still the open question of what IGNORE means for
> getProcessingTime().
>
> Plus, I introduced a setting for ignoring Triggers because I think it
> otherwise doesn't work well with FAILing hard on processing-time API
> calls. I described it in the FLIP, so please have a look at the diff I
> linked above.
>
> Aljoscha
>
>
> On 08.09.20 11:35, Dawid Wysakowicz wrote:
>>> The only one where I could see that users want different behaviour
>>> BATCH jobs on the DataStream API. I agree that processing-time does
>>> not make much sense in batch jobs. However, if users have written some
>>> business logic using processing-time timers their jobs will silently
>>> not work if we set the default to IGNORE. Setting it to FAIL would at
>>> least make users aware that something is not right.
>> I see your point. I was also undecided myself which option to use here.
>> I went with IGNORE for the reason that I thought the common/the most
>> prominent functions should work just out of the box without much
>> additional tweaking. I found the case of "running the same program in
>> BATCH and STREAM" one of such cases and therefore optimized the options
>> for that case. That's why went with IGNORE instead of FAIL. Again I am
>> good with either of the two.
>>
>>> I can also see a small group of users wanting processing-time timers
>>> for BATCH. We could, for example, fire all processing-time timers at
>>> the "end of input", then we also set the watermark to +Inf.
>> I agree. I think this case would be covered with ENABLE + TRIGGER. I do
>> agree though it makes sense to mention this case explicitly as the
>> ENABLE option would behave slightly different in BATCH than in STREAM.
>> Maybe not strictly speaking different, but would be worth explaining
>> anyway. The way I heard from some people you can think of BATCH
>> processing happening instantaneously in processing time. Therefore there
>> can be no timers triggered in between records. In BATCH processing the
>> only time when timers can be triggered is at the end of input. Or at
>> least that is how I see it.
>>
>>> Another thing is: what should we do with new triggers that are set
>>> after the end-of-input. If we have TRIGGER and users keep setting new
>>> processing-time timers in the callback, would we continue firing them.
>>> Or should the behaviour bee QUIESCE_AND_TRIGGER, where we work off
>>> remaining timers but don't add new ones? Do we silently ignore adding
>>> new ones?
>> My take on this issue is that it should be good enough to have the
>> QUIESCE_AND_TRIGGER behaviour with ignoring timers registered after the
>> end of input. We can not fail hard in such scenario, unless we expose a
>> flag saying the timer is after the end of input. Otherwise I can not see
>> a way to correctly safe guard for this scenario. I can see some use
>> cases that would benefit from allowing the timers registration, e.g.
>> periodically checking if some external process finished. In my opinion
>> this is a bit of a different topic, as it is actually an issue of
>> inverting the control when an operator can finish. Right now it is the
>> task that decides that the job/operator finishes at the end of input.
>>
>>> By the way, I assume WAIT means we wait for processing-time to
>>> actually reach the time of pending timers? Or did you have something
>>> else in mind with this?
>> Yes, that's what I meant. I actually took the options from this
>> issue[1], where there is some discussion on that topic as well.
>>
>> Best,
>>
>> Dawid
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-18647
>>
>> On 08/09/2020 10:57, Aljoscha Krettek wrote:
>>> I agree with almost all of your points!
>>>
>>> The only one where I could see that users want different behaviour
>>> BATCH jobs on the DataStream API. I agree that processing-time does
>>> not make much sense in batch jobs. However, if users have written some
>>> business logic using processing-time timers their jobs will silently
>>> not work if we set the default to IGNORE. Setting it to FAIL would at
>>> least make users aware that something is not right.
>>>
>>> I can also see a small group of users wanting processing-time timers
>>> for BATCH. We could, for example, fire all processing-time timers at
>>> the "end of input", then we also set the watermark to +Inf.
>>>
>>> Another thing is: what should we do with new triggers that are set
>>> after the end-of-input. If we have TRIGGER and users keep setting new
>>> processing-time timers in the callback, would we continue firing them.
>>> Or should the behaviour bee QUIESCE_AND_TRIGGER, where we work off
>>> remaining timers but don't add new ones? Do we silently ignore adding
>>> new ones?
>>>
>>> By the way, I assume WAIT means we wait for processing-time to
>>> actually reach the time of pending timers? Or did you have something
>>> else in mind with this?
>>>
>>> Aljoscha
>>>
>>> On 08.09.20 09:19, Dawid Wysakowicz wrote:
>>>> Hey Aljoscha
>>>>
>>>> A couple of thoughts for the two remaining TODOs in the doc:
>>>>
>>>> # Processing Time Support in BATCH/BOUNDED execution mode
>>>>
>>>> I think there are two somewhat orthogonal problems around this topic:
>>>> ย ย ย ย ย  1. Firing processing timers at the end of the job
>>>> ย ย ย ย ย  2. Having processing timers in the BATCH mode
>>>> The way I see it there are three main use cases for different
>>>> combinations of the aforementioned dimensions:
>>>> ย ย ย ย ย  1. Regular streaming jobs: STREAM mode with UNBOUNDED sources
>>>> ย ย ย ย ย  ย ย  - we do want to have processing timers
>>>> ย ย ย ย ย  ย ย  - there is no end of the job
>>>> ย ย ย ย ย  2. Debugging/Testing streaming jobs: STREAM mode with BOUNDED
>>>> sources
>>>> ย ย ย ย ย  ย ย  - we do want to have processing timers
>>>> ย ย ย ย ย  ย ย  - we want to fire/wait for the timers at the end
>>>> ย ย ย ย ย  3. batch jobs with DataStream API:
>>>> ย ย ย ย ย  ย ย  - we do **NOT** want to have processing timers either during
>>>> processing or at the end. We want to either fail-hard or ignore the
>>>> timers. Generally speaking, in BATCH mode the processing timers do not
>>>> make sense, therefore it would be better to fail-hard. It would be the
>>>> safest option, as some of the user logic might depend on the
>>>> processing
>>>> timers. Failing hard would give the user opportunity to react to the
>>>> changed behaviour. On the other hand if we want to make it possible to
>>>> run exact same program both in STREAM and BATCH mode we must have an
>>>> option to simply ignore processing ย ย ย  timers.
>>>> ย ย ย ย ย  ย ย  - we never want to actually trigger the timers. Neither
>>>> during
>>>> runtime nor at the end
>>>>
>>>> Having the above in mind, I am thinking if we should introduce two
>>>> separate options:
>>>> ย ย ย ย ย ย  * processing-time.timers = ENABLE/FAIL/IGNORE
>>>> ย ย ย ย ย ย  * processing-time.on-end-of-input = CANCEL/WAIT/TRIGGER
>>>> With the two options we can satisfy all the above cases. The default
>>>> settings would be:
>>>> STREAM:
>>>> ย ย ย ย ย  ย  processing-time.timers = ENABLE
>>>> ย ย ย ย ย  ย  processing-time.on-end-of-input = TRIGGER
>>>> BATCH:
>>>> ย ย ย ย ย ย  processing-time.timers = IGNORE
>>>> ย ย ย ย ย ย  processing-time.on-end-of-input = CANCEL
>>>>
>>>> # Event time triggers
>>>> I do say that from the implementation perspective, but I find it
>>>> hard to
>>>> actually ignore the event-time triggers. We would have to adjust the
>>>> implementation of WindowOperator to do that. At the same time I see no
>>>> problem with simply keeping them working. I am wondering if we
>>>> should/could just leave them as they are.
>>>>
>>>> # Broadcast State
>>>> As far as I am concerned there are no core semantical problems with
>>>> the
>>>> Broadcast State. As of now, it does not give any guarantees about the
>>>> order in which the broadcast and non-broadcast sides are executed even
>>>> in streaming. It also does not expose any mechanisms to implement an
>>>> event/processing-time alignments (you cannot register timers in the
>>>> broadcast side). I can't see any of the guarantees breaking in the
>>>> BATCH
>>>> mode.
>>>> I do agree it would give somewhat nicer properties in BATCH if we
>>>> consumed the broadcast side first. It would make the operation
>>>> deterministic and let users implement a broadcast join properly on top
>>>> of this method. Nevertheless I see it as an extension of the
>>>> DataStream
>>>> API for BATCH execution rather than making the DataStream API work for
>>>> BATCH.ย  Therefore I'd be fine with the leaving the Broadcast State out
>>>> of the FLIP
>>>>
>>>> What do you think?
>>>>
>>>> On 01/09/2020 13:46, Aljoscha Krettek wrote:
>>>>> Hmm, it seems I left out the Dev ML in my mail. Looping that back
>>>>> in..
>>>>>
>>>>>
>>>>> On 28.08.20 13:54, Dawid Wysakowicz wrote:
>>>>>> @Aljoscha Let me bring back to the ML some of the points we
>>>>>> discussed
>>>>>> offline.
>>>>>>
>>>>>> Ad. 1 Yes I agree it's not just about scheduling. It includes more
>>>>>> changes to the runtime. We might need to make it more prominent
>>>>>> in the
>>>>>> write up.
>>>>>>
>>>>>> Ad. 2 You have a good point here that switching the default value
>>>>>> for
>>>>>> TimeCharacteristic to INGESTION time might not be the best option
>>>>>> as it
>>>>>> might hide problems if we assign ingestion time, which is rarely a
>>>>>> right
>>>>>> choice for user programs. Maybe we could just go with the
>>>>>> EVENT_TIME as
>>>>>> the default?
>>>>>>
>>>>>> Ad. 4 That's a very good point! I do agree with you it would be
>>>>>> better
>>>>>> to change the behaviour of said methods for batch-style execution.
>>>>>> Even
>>>>>> though it changes the behaviour, the overall logic is still correct.
>>>>>> Moreover I'd also recommend deprecating some of the relational-like
>>>>>> methods, which we should rather redirect to the Table API. I added a
>>>>>> section about it to the FLIP (mostly copying over your message).
>>>>>> Let me
>>>>>> know what you think about it.
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Dawid
>>>>>>
>>>>>> On 25/08/2020 11:39, Aljoscha Krettek wrote:
>>>>>>> Thanks for creating this FLIP! I think the general direction is
>>>>>>> very
>>>>>>> good but I think there are some specifics that we should also
>>>>>>> put in
>>>>>>> there and that we may need to discuss here as well.
>>>>>>>
>>>>>>> ## About batch vs streaming scheduling
>>>>>>>
>>>>>>> I think we shouldn't call it "scheduling", because the decision
>>>>>>> between bounded and unbounded affects more than just scheduling. It
>>>>>>> affects how we do network transfers and the semantics of time,
>>>>>>> among
>>>>>>> other things. So maybe we should differentiate between batch-style
>>>>>>> and
>>>>>>> streaming-style execution, though I'm not sure I like those terms
>>>>>>> either.
>>>>>>>
>>>>>>> ## About processing-time support in batch
>>>>>>>
>>>>>>> It's not just about "batch" changing the default to ingestion
>>>>>>> time is
>>>>>>> a change for stream processing as well. Actually, I don't know if
>>>>>>> ingestion time even makes sense for batch processing. IIRC, with
>>>>>>> the
>>>>>>> new sources we actually always have a timestamp, so this discussion
>>>>>>> might be moot. Maybe Becket and/or Stephan (cc'ed) could chime
>>>>>>> in on
>>>>>>> this.
>>>>>>>
>>>>>>> Also, I think it's right that we currently ignore processing-time
>>>>>>> timers at the end of input in streaming jobs, but this has been a
>>>>>>> source of trouble for users. See [1] and several discussions on the
>>>>>>> ML. I'm also cc'ing Flavio here who also ran into this problem. I
>>>>>>> think we should solve this quickly after laying the foundations of
>>>>>>> bounded processing on the DataStream API.
>>>>>>>
>>>>>>> ## About broadcast state support
>>>>>>>
>>>>>>> I think as a low-hanging fruit we could just read the broadcast
>>>>>>> side
>>>>>>> first and then switch to the regular input. We do need to be
>>>>>>> careful
>>>>>>> with creating distributed deadlocks, though, so this might be
>>>>>>> trickier
>>>>>>> than it seems at first.
>>>>>>>
>>>>>>> ## Loose ends and weird semantics
>>>>>>>
>>>>>>> There are some operations in the DataStream API that have semantics
>>>>>>> that might make sense for stream processing but should behave
>>>>>>> differently for batch. For example, KeyedStream.reduce() is
>>>>>>> essentially a reduce on a GlobalWindow with a Trigger that fires on
>>>>>>> every element. In DB terms it produces an UPSERT stream as an
>>>>>>> output,
>>>>>>> if you get ten input elements for a key you also get ten output
>>>>>>> records. For batch processing it might make more sense to instead
>>>>>>> only
>>>>>>> produce one output record per key with the result of the
>>>>>>> aggregation.
>>>>>>> This would be correct for downstream consumers that expect an
>>>>>>> UPSERT
>>>>>>> stream but it would change the actual physical output stream that
>>>>>>> they
>>>>>>> see.
>>>>>>>
>>>>>>> There might be other such operations in the DataStream API that
>>>>>>> have
>>>>>>> slightly weird behaviour that doesn't make much sense when you do
>>>>>>> bounded processing.
>>>>>>>
>>>>>>> Best,
>>>>>>> Aljoscha
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-18647
>>>>>>>
>>>>>>> On 24.08.20 11:29, Kostas Kloudas wrote:
>>>>>>>> Thanks a lot for the discussion!
>>>>>>>>
>>>>>>>> I will open a voting thread shortly!
>>>>>>>>
>>>>>>>> Kostas
>>>>>>>>
>>>>>>>> On Mon, Aug 24, 2020 at 9:46 AM Kostas Kloudas
>>>>>>>> <[hidden email]>
>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> Hi Guowei,
>>>>>>>>>
>>>>>>>>> Thanks for the insightful comment!
>>>>>>>>>
>>>>>>>>> I agree that this can be a limitation of the current runtime,
>>>>>>>>> but I
>>>>>>>>> think that this FLIP can go on as it discusses mainly the
>>>>>>>>> semantics
>>>>>>>>> that the DataStream API will expose when applied on bounded data.
>>>>>>>>> There will definitely be other FLIPs that will actually handle
>>>>>>>>> the
>>>>>>>>> runtime-related topics.
>>>>>>>>>
>>>>>>>>> But it is good to document them nevertheless so that we start
>>>>>>>>> soon
>>>>>>>>> ironing out the remaining rough edges.
>>>>>>>>>
>>>>>>>>> Cheers,
>>>>>>>>> Kostas
>>>>>>>>>
>>>>>>>>> On Mon, Aug 24, 2020 at 9:16 AM Guowei Ma <[hidden email]>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi, Klou
>>>>>>>>>>
>>>>>>>>>> Thanks for your proposal. It's a very good idea.
>>>>>>>>>> Just a little comment about the "Batch vs Streaming Scheduling".
>>>>>>>>>> In the AUTOMATIC execution mode maybe we could not pick BATCH
>>>>>>>>>> execution mode even if all sources are bounded. For example some
>>>>>>>>>> applications would use the `CheckpointListener`, which is not
>>>>>>>>>> available in the BATCH mode in current implementation.
>>>>>>>>>> So maybe we need more checks in the AUTOMATIC execution mode.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Guowei
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Aug 20, 2020 at 10:27 PM Kostas Kloudas
>>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi all,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the comments!
>>>>>>>>>>>
>>>>>>>>>>> @Dawid: "execution.mode" can be a nice alternative and from a
>>>>>>>>>>> quick
>>>>>>>>>>> look it is not used currently by any configuration option. I
>>>>>>>>>>> will
>>>>>>>>>>> update the FLIP accordingly.
>>>>>>>>>>>
>>>>>>>>>>> @David: Given that having the option to allow timers to fire
>>>>>>>>>>> at the
>>>>>>>>>>> end of the job is already in the FLIP, I will leave it as is
>>>>>>>>>>> and I
>>>>>>>>>>> will update the default policy to be "ignore processing time
>>>>>>>>>>> timers
>>>>>>>>>>> set by the user". This will allow existing dataStream programs
>>>>>>>>>>> to run
>>>>>>>>>>> on bounded inputs. This update will affect point 2 in the
>>>>>>>>>>> "Processing
>>>>>>>>>>> Time Support in Batch" section.
>>>>>>>>>>>
>>>>>>>>>>> If these changes cover your proposals, then I would like to
>>>>>>>>>>> start a
>>>>>>>>>>> voting thread tomorrow evening if this is ok with you.
>>>>>>>>>>>
>>>>>>>>>>> Please let me know until then.
>>>>>>>>>>>
>>>>>>>>>>> Kostas
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Aug 18, 2020 at 3:54 PM David Anderson
>>>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> Being able to optionally fire registered processing time
>>>>>>>>>>>> timers
>>>>>>>>>>>> at the end of a job would be interesting, and would help in
>>>>>>>>>>>> (at
>>>>>>>>>>>> least some of) the cases I have in mind. I don't have a better
>>>>>>>>>>>> idea.
>>>>>>>>>>>>
>>>>>>>>>>>> David
>>>>>>>>>>>>
>>>>>>>>>>>> On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas
>>>>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Kurt and David,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks a lot for the insightful feedback!
>>>>>>>>>>>>>
>>>>>>>>>>>>> @Kurt: For the topic of checkpointing with Batch
>>>>>>>>>>>>> Scheduling, I
>>>>>>>>>>>>> totally
>>>>>>>>>>>>> agree with you that it requires a lot more work and careful
>>>>>>>>>>>>> thinking
>>>>>>>>>>>>> on the semantics. This FLIP was written under the assumption
>>>>>>>>>>>>> that if
>>>>>>>>>>>>> the user wants to have checkpoints on bounded input, he/she
>>>>>>>>>>>>> will
>>>>>>>>>>>>> have
>>>>>>>>>>>>> to go with STREAMING as the scheduling mode. Checkpointing
>>>>>>>>>>>>> for
>>>>>>>>>>>>> BATCH
>>>>>>>>>>>>> can be handled as a separate topic in the future.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In the case of MIXED workloads and for this FLIP, the
>>>>>>>>>>>>> scheduling
>>>>>>>>>>>>> mode
>>>>>>>>>>>>> should be set to STREAMING. That is why the AUTOMATIC option
>>>>>>>>>>>>> sets
>>>>>>>>>>>>> scheduling to BATCH only if all the sources are bounded. I am
>>>>>>>>>>>>> not sure
>>>>>>>>>>>>> what are the plans there at the scheduling level, as one
>>>>>>>>>>>>> could
>>>>>>>>>>>>> imagine
>>>>>>>>>>>>> in the future that in mixed workloads, we schedule first all
>>>>>>>>>>>>> the
>>>>>>>>>>>>> bounded subgraphs in BATCH mode and we allow only one
>>>>>>>>>>>>> UNBOUNDED
>>>>>>>>>>>>> subgraph per application, which is going to be scheduled
>>>>>>>>>>>>> after
>>>>>>>>>>>>> all
>>>>>>>>>>>>> Bounded ones have finished. Essentially the bounded subgraphs
>>>>>>>>>>>>> will be
>>>>>>>>>>>>> used to bootstrap the unbounded one. But, I am not aware
>>>>>>>>>>>>> of any
>>>>>>>>>>>>> plans
>>>>>>>>>>>>> towards that direction.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> @David: The processing time timer handling is a topic that
>>>>>>>>>>>>> has
>>>>>>>>>>>>> also
>>>>>>>>>>>>> been discussed in the community in the past, and I do not
>>>>>>>>>>>>> remember any
>>>>>>>>>>>>> final conclusion unfortunately.
>>>>>>>>>>>>>
>>>>>>>>>>>>> In the current context and for bounded input, we chose to
>>>>>>>>>>>>> favor
>>>>>>>>>>>>> reproducibility of the result, as this is expected in batch
>>>>>>>>>>>>> processing
>>>>>>>>>>>>> where the whole input is available in advance. This is why
>>>>>>>>>>>>> this
>>>>>>>>>>>>> proposal suggests to not allow processing time timers. But I
>>>>>>>>>>>>> understand your argument that the user may want to be able to
>>>>>>>>>>>>> run the
>>>>>>>>>>>>> same pipeline on batch and streaming this is why we added
>>>>>>>>>>>>> the two
>>>>>>>>>>>>> options under future work, namely (from the FLIP):
>>>>>>>>>>>>>
>>>>>>>>>>>>> ```
>>>>>>>>>>>>> Future Work: In the future we may consider adding as options
>>>>>>>>>>>>> the
>>>>>>>>>>>>> capability of:
>>>>>>>>>>>>> * firing all the registered processing time timers at the
>>>>>>>>>>>>> end of
>>>>>>>>>>>>> a job
>>>>>>>>>>>>> (at close()) or,
>>>>>>>>>>>>> * ignoring all the registered processing time timers at
>>>>>>>>>>>>> the end
>>>>>>>>>>>>> of a job.
>>>>>>>>>>>>> ```
>>>>>>>>>>>>>
>>>>>>>>>>>>> Conceptually, we are essentially saying that we assume that
>>>>>>>>>>>>> batch
>>>>>>>>>>>>> execution is assumed to be instantaneous and refers to a
>>>>>>>>>>>>> single
>>>>>>>>>>>>> "point" in time and any processing-time timers for the future
>>>>>>>>>>>>> may fire
>>>>>>>>>>>>> at the end of execution or be ignored (but not throw an
>>>>>>>>>>>>> exception). I
>>>>>>>>>>>>> could also see ignoring the timers in batch as the
>>>>>>>>>>>>> default, if
>>>>>>>>>>>>> this
>>>>>>>>>>>>> makes more sense.
>>>>>>>>>>>>>
>>>>>>>>>>>>> By the way, do you have any usecases in mind that will
>>>>>>>>>>>>> help us
>>>>>>>>>>>>> better
>>>>>>>>>>>>> shape our processing time timer handling?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Kostas
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Aug 17, 2020 at 2:52 PM David Anderson
>>>>>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Kostas,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I'm pleased to see some concrete details in this FLIP.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I wonder if the current proposal goes far enough in the
>>>>>>>>>>>>>> direction of recognizing the need some users may have for
>>>>>>>>>>>>>> "batch" and "bounded streaming" to be treated
>>>>>>>>>>>>>> differently. If
>>>>>>>>>>>>>> I've understood it correctly, the section on scheduling
>>>>>>>>>>>>>> allows
>>>>>>>>>>>>>> me to choose STREAMING scheduling even if I have bounded
>>>>>>>>>>>>>> sources. I like that approach, because it recognizes that
>>>>>>>>>>>>>> even
>>>>>>>>>>>>>> though I have bounded inputs, I don't necessarily want batch
>>>>>>>>>>>>>> processing semantics. I think it makes sense to extend this
>>>>>>>>>>>>>> idea to processing time support as well.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> My thinking is that sometimes in development and testing
>>>>>>>>>>>>>> it's
>>>>>>>>>>>>>> reasonable to run exactly the same job as in production,
>>>>>>>>>>>>>> except
>>>>>>>>>>>>>> with different sources and sinks. While it might be a
>>>>>>>>>>>>>> reasonable default, I'm not convinced that switching a
>>>>>>>>>>>>>> processing time streaming job to read from a bounded source
>>>>>>>>>>>>>> should always cause it to fail.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> David
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Aug 12, 2020 at 5:22 PM Kostas Kloudas
>>>>>>>>>>>>>> <[hidden email]> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi all,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> As described in FLIP-131 [1], we are aiming at deprecating
>>>>>>>>>>>>>>> the
>>>>>>>>>>>>>>> DataSet
>>>>>>>>>>>>>>> API in favour of the DataStream API and the Table API.
>>>>>>>>>>>>>>> After
>>>>>>>>>>>>>>> this work
>>>>>>>>>>>>>>> is done, the user will be able to write a program using the
>>>>>>>>>>>>>>> DataStream
>>>>>>>>>>>>>>> API and this will execute efficiently on both bounded and
>>>>>>>>>>>>>>> unbounded
>>>>>>>>>>>>>>> data. But before we reach this point, it is worth
>>>>>>>>>>>>>>> discussing
>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> agreeing on the semantics of some operations as we
>>>>>>>>>>>>>>> transition
>>>>>>>>>>>>>>> from the
>>>>>>>>>>>>>>> streaming world to the batch one.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> This thread and the associated FLIP [2] aim at discussing
>>>>>>>>>>>>>>> these issues
>>>>>>>>>>>>>>> as these topics are pretty important to users and can
>>>>>>>>>>>>>>> lead to
>>>>>>>>>>>>>>> unpleasant surprises if we do not pay attention.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Let's have a healthy discussion here and I will be updating
>>>>>>>>>>>>>>> the FLIP
>>>>>>>>>>>>>>> accordingly.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Cheers,
>>>>>>>>>>>>>>> Kostas
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158871522
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

Aljoscha Krettek-2
On 10.09.20 11:30, Dawid Wysakowicz wrote:
> I am not sure about the option for ignoring the Triggers. Do you mean to
> ignore all the Triggers including e.g. Flink's such as CountTrigger,
> EventTimeTrigger etc.? Won't it effectively disable the WindowOperator
> whatsoever. Or even worse make it unusable with ever growing state? I
> might be wrong here but aren't Triggers required for emitting results
> from WindowOperator? If I am correct we emit results only if a Trigger
> returns FIRE from on of onElement, onEventTime, onProcessingTime. Why do
> you think it does not work well with FAILing hard without this option?
> We could fail hard e.g. if the WindowAssigner#isEventTime returns false.

The problem I'm trying to solve are mixed Triggers. Say you have a
Trigger that does "fire when watermark passes maxTimestamp() but also
fire every 5 minutes in processing time and when the watermark passes
maxTimestamp() fire for every 5 new records". This is something that the
Beam API for example allows users to specify and is something that I
think is potentially valuable in the real world.

Ignoring Triggers would mean that we always fire on the maxTimestamp()
by hardcoding this in a WindowOperator that we use for BATCH execution.
With this, the WindowAssigner becomes the only thing that changes. This
is similar to how Beam treats windows, where the WindowAssigner carries
semantic content but the Trigger is only for optimizing streaming
emission, which you don't need for BATCH where you always have a
"perfect watermark".

Coming back to the initial example, such a Trigger would not work if we
FAIL hard for processing-time on BATCH, which I'm suggesting because we
otherwise have potentially surprising results if business logic depends
on processing-time timers. For Windows, on the other hand, we could get
around it by agreeing that Triggers are ignored for BATCH.

> As for the question with getProcessingTime(). From my point of view, it
> would be safe to simply return the current system time. I cannot think
> of any dangers if we do so. Moreover, frankly speaking I am not entirely
> sure what is the purpose of the method, other than injecting a clock in
> tests of built-in operators. Maybe it was a mistake to expose it in the
> user's API?

I agree, it was a mistake to expose getProcessingTime(). And I also
think the same about getCurrentWatermark(), but that's neither here nor
there. ๐Ÿ˜… I then also agree to just return the current time, as you
said. I will change the FLIP for this.

Aljoscha
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

dwysakowicz
Thanks for the explanation! Makes sense now! What do you think about
adding this behaviour of WindowAssigner in streaming mode as well? I
mean the behaviour of emitting at the end of a Window. I think it would
make sense in the STREAM mode as well and keep the two modes more aligned.

Best,

Dawid

On 10/09/2020 11:42, Aljoscha Krettek wrote:

> On 10.09.20 11:30, Dawid Wysakowicz wrote:
>> I am not sure about the option for ignoring the Triggers. Do you mean to
>> ignore all the Triggers including e.g. Flink's such as CountTrigger,
>> EventTimeTrigger etc.? Won't it effectively disable the WindowOperator
>> whatsoever. Or even worse make it unusable with ever growing state? I
>> might be wrong here but aren't Triggers required for emitting results
>> from WindowOperator? If I am correct we emit results only if a Trigger
>> returns FIRE from on of onElement, onEventTime, onProcessingTime. Why do
>> you think it does not work well with FAILing hard without this option?
>> We could fail hard e.g. if the WindowAssigner#isEventTime returns false.
>
> The problem I'm trying to solve are mixed Triggers. Say you have a
> Trigger that does "fire when watermark passes maxTimestamp() but also
> fire every 5 minutes in processing time and when the watermark passes
> maxTimestamp() fire for every 5 new records". This is something that
> the Beam API for example allows users to specify and is something that
> I think is potentially valuable in the real world.
>
> Ignoring Triggers would mean that we always fire on the maxTimestamp()
> by hardcoding this in a WindowOperator that we use for BATCH
> execution. With this, the WindowAssigner becomes the only thing that
> changes. This is similar to how Beam treats windows, where the
> WindowAssigner carries semantic content but the Trigger is only for
> optimizing streaming emission, which you don't need for BATCH where
> you always have a "perfect watermark".
>
> Coming back to the initial example, such a Trigger would not work if
> we FAIL hard for processing-time on BATCH, which I'm suggesting
> because we otherwise have potentially surprising results if business
> logic depends on processing-time timers. For Windows, on the other
> hand, we could get around it by agreeing that Triggers are ignored for
> BATCH.
>
>> As for the question with getProcessingTime(). From my point of view, it
>> would be safe to simply return the current system time. I cannot think
>> of any dangers if we do so. Moreover, frankly speaking I am not entirely
>> sure what is the purpose of the method, other than injecting a clock in
>> tests of built-in operators. Maybe it was a mistake to expose it in the
>> user's API?
>
> I agree, it was a mistake to expose getProcessingTime(). And I also
> think the same about getCurrentWatermark(), but that's neither here
> nor there. ๐Ÿ˜… I then also agree to just return the current time, as
> you said. I will change the FLIP for this.
>
> Aljoscha


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

David Anderson-3
Having just re-read FLIP-134, I think it mostly makes sense, though I'm not
exactly looking forward to figuring out how to explain it without making it
seem overly complicated.

A few points:

I'm a bit confused by the discussion around custom window Triggers. Yes, I
agree that complex, mixed Triggers are sometimes useful. And I buy into the
argument that we want to FAIL hard for processing-time on BATCH. But why
not go ahead and FAIL Triggers that can't work, rather than ignoring all
custom Triggers?

A BIG +1 from me for deprecating timeWindow.

I do think it's critical that bounded streaming has the same configuration
as unbounded streaming. Users expect/need things like processing time
timers in bounded streaming during development. If I've understood the
proposal correctly, this will be the case.

I would prefer WARN over IGNORE as the default for cases where users have
explicitly specified something that isnโ€™t going to happen. (I would also
like to see a warning given for any job that uses event time timers without
having a watermark strategy, though that's unrelated to the topic at hand.)

David

On Thu, Sep 10, 2020 at 2:57 PM Dawid Wysakowicz <[hidden email]>
wrote:

> Thanks for the explanation! Makes sense now! What do you think about
> adding this behaviour of WindowAssigner in streaming mode as well? I
> mean the behaviour of emitting at the end of a Window. I think it would
> make sense in the STREAM mode as well and keep the two modes more aligned.
>
> Best,
>
> Dawid
>
> On 10/09/2020 11:42, Aljoscha Krettek wrote:
> > On 10.09.20 11:30, Dawid Wysakowicz wrote:
> >> I am not sure about the option for ignoring the Triggers. Do you mean to
> >> ignore all the Triggers including e.g. Flink's such as CountTrigger,
> >> EventTimeTrigger etc.? Won't it effectively disable the WindowOperator
> >> whatsoever. Or even worse make it unusable with ever growing state? I
> >> might be wrong here but aren't Triggers required for emitting results
> >> from WindowOperator? If I am correct we emit results only if a Trigger
> >> returns FIRE from on of onElement, onEventTime, onProcessingTime. Why do
> >> you think it does not work well with FAILing hard without this option?
> >> We could fail hard e.g. if the WindowAssigner#isEventTime returns false.
> >
> > The problem I'm trying to solve are mixed Triggers. Say you have a
> > Trigger that does "fire when watermark passes maxTimestamp() but also
> > fire every 5 minutes in processing time and when the watermark passes
> > maxTimestamp() fire for every 5 new records". This is something that
> > the Beam API for example allows users to specify and is something that
> > I think is potentially valuable in the real world.
> >
> > Ignoring Triggers would mean that we always fire on the maxTimestamp()
> > by hardcoding this in a WindowOperator that we use for BATCH
> > execution. With this, the WindowAssigner becomes the only thing that
> > changes. This is similar to how Beam treats windows, where the
> > WindowAssigner carries semantic content but the Trigger is only for
> > optimizing streaming emission, which you don't need for BATCH where
> > you always have a "perfect watermark".
> >
> > Coming back to the initial example, such a Trigger would not work if
> > we FAIL hard for processing-time on BATCH, which I'm suggesting
> > because we otherwise have potentially surprising results if business
> > logic depends on processing-time timers. For Windows, on the other
> > hand, we could get around it by agreeing that Triggers are ignored for
> > BATCH.
> >
> >> As for the question with getProcessingTime(). From my point of view, it
> >> would be safe to simply return the current system time. I cannot think
> >> of any dangers if we do so. Moreover, frankly speaking I am not entirely
> >> sure what is the purpose of the method, other than injecting a clock in
> >> tests of built-in operators. Maybe it was a mistake to expose it in the
> >> user's API?
> >
> > I agree, it was a mistake to expose getProcessingTime(). And I also
> > think the same about getCurrentWatermark(), but that's neither here
> > nor there. ๐Ÿ˜… I then also agree to just return the current time, as
> > you said. I will change the FLIP for this.
> >
> > Aljoscha
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

Aljoscha Krettek-2
Thanks for the thoughtful comments! I'll try and address them inline
below. I'm hoping to start a VOTE thread soon if there are no other
comments by the end of today.

On 10.09.20 15:40, David Anderson wrote:
> Having just re-read FLIP-134, I think it mostly makes sense, though I'm not
> exactly looking forward to figuring out how to explain it without making it
> seem overly complicated.

Which are the points where you see the explanation could become to
complex? For me, the only difference in behaviour is processing-time
timers, which will fail hard in BATCH execution mode. Things like
shuffle-mode and schedule-mode should be transparent and I would not
mention them in the documentation except in an advanced section.

> I'm a bit confused by the discussion around custom window Triggers. Yes, I
> agree that complex, mixed Triggers are sometimes useful. And I buy into the
> argument that we want to FAIL hard for processing-time on BATCH. But why
> not go ahead and FAIL Triggers that can't work, rather than ignoring all
> custom Triggers?

The motivation is to allow the same program to work on BATCH and on
STREAMING, and in reality DataStream programs often have Triggers that
you wouldn't need for BATCH execution.

I do think that this topic is too important to have it as a sub-section
in this FLIP. I will remove it and write another FLIP just about this
topic. This will mean that DataStream programs that have Triggers that
use processing-time will simply fail hard. Which is acceptable for an
initial version, I thin
> I do think it's critical that bounded streaming has the same configuration
> as unbounded streaming. Users expect/need things like processing time
> timers in bounded streaming during development. If I've understood the
> proposal correctly, this will be the case.

If you're referring to the case where you have STREAMING execution mode
but your sources are bounded (for development), then yes, I think we're
on the same page.

> I would prefer WARN over IGNORE as the default for cases where users have
> explicitly specified something that isnโ€™t going to happen. (I would also
> like to see a warning given for any job that uses event time timers without
> having a watermark strategy, though that's unrelated to the topic at hand.)

Agreed, that's why I'm proposing pipeline.processing-time.allow: FAIL as
the default setting for BATCH execution mode. Is there another setting
where we currently propose IGNORE but you think it should be FAIL? There
is pipeline.processing-time.end-of-input: IGNORE, which is in line with
the current behaviour, and failing when timers are set means there won't
be any to fire in BATCH execution mode.

Aljoscha

12