Change Streaming Source Function Interface

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Change Streaming Source Function Interface

Stephan Ewen
Hi all!

I think we need to change the interface of the streaming source function.

The function currently has simply a run() method where it does its work,
until canceled.

With this, it is hard to write sources, where the state and the snapshot
barriers are exactly aligned.
When performing the checkpoint, the vertex will grab the state from the
source and inject a checkpoint barrier. It is not clear that the injected
barrier aligns with the state, because the source may have emitted more
records since grabbing the state, or not emitted the record that is
reflected in the state (offset).

If we change the interface to a more iterator-like interface (hasNext() and
next()), then the vertex calls these methods and can checkpoint in-between
calling the methods.
After hasNext() is a well defined point, where the state can be grabbed and
the barrier be emitted.


Any opinions on that?


Stephan
Reply | Threaded
Open this post in threaded view
|

Re: Change Streaming Source Function Interface

Gyula Fóra
Hi,

The only thing we need is to guarantee that the source will not output any
records or update the state while we take the snapshot and send the
barrier. There are multiple ways of doing this I guess. We could simply
lock on these objects for instance or add the methods you wrote. If we
lock, we can assure that no user thread will find a way around the next()
and hasNext() (which would otherwise cause problems), and we can also keep
the current interface.

I think we just need to figure out what is the preferable user interface
for sources, having a simple run and cancel methods or going with the
next(), hasNext etc. Or we could just support both.

Gyula

On Thu, Apr 30, 2015 at 8:30 PM, Stephan Ewen <[hidden email]> wrote:

> Hi all!
>
> I think we need to change the interface of the streaming source function.
>
> The function currently has simply a run() method where it does its work,
> until canceled.
>
> With this, it is hard to write sources, where the state and the snapshot
> barriers are exactly aligned.
> When performing the checkpoint, the vertex will grab the state from the
> source and inject a checkpoint barrier. It is not clear that the injected
> barrier aligns with the state, because the source may have emitted more
> records since grabbing the state, or not emitted the record that is
> reflected in the state (offset).
>
> If we change the interface to a more iterator-like interface (hasNext() and
> next()), then the vertex calls these methods and can checkpoint in-between
> calling the methods.
> After hasNext() is a well defined point, where the state can be grabbed and
> the barrier be emitted.
>
>
> Any opinions on that?
>
>
> Stephan
>
Reply | Threaded
Open this post in threaded view
|

Re: Change Streaming Source Function Interface

Stephan Ewen
For the variant with the "run()" method, this requires strong assumptions
about the internal behavior of the source.

Unless I am overlooking something, the source needs to guarantee this:

 - It needs to lock internally and perform the state update and record emit
call inside the locked scope

 - It needs to use the same state object all the time, otherwise the driver
and the source may lock different objects

 - The second point makes it very hard to support sources that return
copies (or shadow copies) of the state, to support asynchronous
snapshotting.

 - A per-element lock is an overhead that we could avoid with the "next()"
approach.


On Thu, Apr 30, 2015 at 10:04 PM, Gyula Fóra <[hidden email]> wrote:

> Hi,
>
> The only thing we need is to guarantee that the source will not output any
> records or update the state while we take the snapshot and send the
> barrier. There are multiple ways of doing this I guess. We could simply
> lock on these objects for instance or add the methods you wrote. If we
> lock, we can assure that no user thread will find a way around the next()
> and hasNext() (which would otherwise cause problems), and we can also keep
> the current interface.
>
> I think we just need to figure out what is the preferable user interface
> for sources, having a simple run and cancel methods or going with the
> next(), hasNext etc. Or we could just support both.
>
> Gyula
>
> On Thu, Apr 30, 2015 at 8:30 PM, Stephan Ewen <[hidden email]> wrote:
>
> > Hi all!
> >
> > I think we need to change the interface of the streaming source function.
> >
> > The function currently has simply a run() method where it does its work,
> > until canceled.
> >
> > With this, it is hard to write sources, where the state and the snapshot
> > barriers are exactly aligned.
> > When performing the checkpoint, the vertex will grab the state from the
> > source and inject a checkpoint barrier. It is not clear that the injected
> > barrier aligns with the state, because the source may have emitted more
> > records since grabbing the state, or not emitted the record that is
> > reflected in the state (offset).
> >
> > If we change the interface to a more iterator-like interface (hasNext()
> and
> > next()), then the vertex calls these methods and can checkpoint
> in-between
> > calling the methods.
> > After hasNext() is a well defined point, where the state can be grabbed
> and
> > the barrier be emitted.
> >
> >
> > Any opinions on that?
> >
> >
> > Stephan
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Change Streaming Source Function Interface

Gyula Fóra
Okay, sounds very reasonable :)

On Thu, Apr 30, 2015 at 10:15 PM, Stephan Ewen <[hidden email]> wrote:

> For the variant with the "run()" method, this requires strong assumptions
> about the internal behavior of the source.
>
> Unless I am overlooking something, the source needs to guarantee this:
>
>  - It needs to lock internally and perform the state update and record emit
> call inside the locked scope
>
>  - It needs to use the same state object all the time, otherwise the driver
> and the source may lock different objects
>
>  - The second point makes it very hard to support sources that return
> copies (or shadow copies) of the state, to support asynchronous
> snapshotting.
>
>  - A per-element lock is an overhead that we could avoid with the "next()"
> approach.
>
>
> On Thu, Apr 30, 2015 at 10:04 PM, Gyula Fóra <[hidden email]> wrote:
>
> > Hi,
> >
> > The only thing we need is to guarantee that the source will not output
> any
> > records or update the state while we take the snapshot and send the
> > barrier. There are multiple ways of doing this I guess. We could simply
> > lock on these objects for instance or add the methods you wrote. If we
> > lock, we can assure that no user thread will find a way around the next()
> > and hasNext() (which would otherwise cause problems), and we can also
> keep
> > the current interface.
> >
> > I think we just need to figure out what is the preferable user interface
> > for sources, having a simple run and cancel methods or going with the
> > next(), hasNext etc. Or we could just support both.
> >
> > Gyula
> >
> > On Thu, Apr 30, 2015 at 8:30 PM, Stephan Ewen <[hidden email]> wrote:
> >
> > > Hi all!
> > >
> > > I think we need to change the interface of the streaming source
> function.
> > >
> > > The function currently has simply a run() method where it does its
> work,
> > > until canceled.
> > >
> > > With this, it is hard to write sources, where the state and the
> snapshot
> > > barriers are exactly aligned.
> > > When performing the checkpoint, the vertex will grab the state from the
> > > source and inject a checkpoint barrier. It is not clear that the
> injected
> > > barrier aligns with the state, because the source may have emitted more
> > > records since grabbing the state, or not emitted the record that is
> > > reflected in the state (offset).
> > >
> > > If we change the interface to a more iterator-like interface (hasNext()
> > and
> > > next()), then the vertex calls these methods and can checkpoint
> > in-between
> > > calling the methods.
> > > After hasNext() is a well defined point, where the state can be grabbed
> > and
> > > the barrier be emitted.
> > >
> > >
> > > Any opinions on that?
> > >
> > >
> > > Stephan
> > >
> >
>