[DISCUSS] FLIP-27: Refactor Source Interface

classic Classic list List threaded Threaded
86 messages Options
12345
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-27: Refactor Source Interface

Aljoscha Krettek-2
Hi All,

In order to finally get the ball rolling on the new source interface that we have discussed for so long I finally created a FLIP: https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface

I cc'ed Thomas and Jamie because of the ongoing work/discussion about adding per-partition watermark support to the Kinesis source and because this would enable generic implementation of event-time alignment for all sources. Maybe we need another FLIP for the event-time alignment part, especially the part about information sharing between operations (I'm not calling it state sharing because state has a special meaning in Flink).

Please discuss away!

Aljoscha


Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Piotr Nowojski
Hi,

Thanks Aljoscha for starting this, it’s blocking quite a lot of other possible improvements. I have one proposal. Instead of having a method:

boolean advance() throws IOException;

I would replace it with

/*
 * Return a future, which when completed means that source has more data and getNext() will not block.
 * If you wish to use benefits of non blocking connectors, please implement this method appropriately.
 */
default CompletableFuture<?> isBlocked() {
        return CompletableFuture.completedFuture(null);
}

And rename `getCurrent()` to `getNext()`.

Couple of arguments:
1. I don’t understand the division of work between `advance()` and `getCurrent()`. What should be done in which, especially for connectors that handle records in batches (like Kafka) and when should you call `advance` and when `getCurrent()`.
2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the future to have asynchronous/non blocking connectors and more efficiently handle large number of blocked threads, without busy waiting. While at the same time it doesn’t add much complexity, since naive connector implementations can be always blocking.
3. This also would allow us to use a fixed size thread pool of task executors, instead of one thread per task.

Piotrek

> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]> wrote:
>
> Hi All,
>
> In order to finally get the ball rolling on the new source interface that we have discussed for so long I finally created a FLIP: https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>
> I cc'ed Thomas and Jamie because of the ongoing work/discussion about adding per-partition watermark support to the Kinesis source and because this would enable generic implementation of event-time alignment for all sources. Maybe we need another FLIP for the event-time alignment part, especially the part about information sharing between operations (I'm not calling it state sharing because state has a special meaning in Flink).
>
> Please discuss away!
>
> Aljoscha
>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Becket Qin
Thanks for the FLIP, Aljoscha.

The proposal makes sense to me. Separating the split discovery and
consumption is very useful as it enables Flink to better manage the sources.

Looking at the interface, I have a few questions:
1. *SplitEnumerator*.*discoverNewSplits()* seems assuming that the number
of splits can only increase, In your example, the source was Kafka, so the
assumption was true. But I am wondering are there case that the number of
splits can decrease?
2. I agree with Piotr that we need to be careful about potentially blocking
implementations. However, it is not clear to me how does the completable
future work if the underlying reader does not have its own thread (e.g. a
Kafka consumer). In that case, the future will never be completed unless
the caller thread touches the reader again. I am wondering if the following
interfaces for the reader makes sense:
    boolean isDone(); // Whether the source has more records.
    T poll(); // non-blocking read. We can add a timeout if needed.
    T take(); // blocking read;
This seems more intuitive to people who are familiar with existing
convention of poll() and take(). And with the non-blocking poll() we could
have an nio Selector-like API when there are multiple splits.

BTW, it would be really helpful if there is some Java doc describing the
behavior of the the interfaces in the FLIP.

Thanks again for the great proposal.

Jiangjie (Becket) Qin

On Thu, Nov 1, 2018 at 6:08 PM Piotr Nowojski <[hidden email]>
wrote:

> Hi,
>
> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
> possible improvements. I have one proposal. Instead of having a method:
>
> boolean advance() throws IOException;
>
> I would replace it with
>
> /*
>  * Return a future, which when completed means that source has more data
> and getNext() will not block.
>  * If you wish to use benefits of non blocking connectors, please
> implement this method appropriately.
>  */
> default CompletableFuture<?> isBlocked() {
>         return CompletableFuture.completedFuture(null);
> }
>
> And rename `getCurrent()` to `getNext()`.
>
> Couple of arguments:
> 1. I don’t understand the division of work between `advance()` and
> `getCurrent()`. What should be done in which, especially for connectors
> that handle records in batches (like Kafka) and when should you call
> `advance` and when `getCurrent()`.
> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
> future to have asynchronous/non blocking connectors and more efficiently
> handle large number of blocked threads, without busy waiting. While at the
> same time it doesn’t add much complexity, since naive connector
> implementations can be always blocking.
> 3. This also would allow us to use a fixed size thread pool of task
> executors, instead of one thread per task.
>
> Piotrek
>
> > On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]> wrote:
> >
> > Hi All,
> >
> > In order to finally get the ball rolling on the new source interface
> that we have discussed for so long I finally created a FLIP:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >
> > I cc'ed Thomas and Jamie because of the ongoing work/discussion about
> adding per-partition watermark support to the Kinesis source and because
> this would enable generic implementation of event-time alignment for all
> sources. Maybe we need another FLIP for the event-time alignment part,
> especially the part about information sharing between operations (I'm not
> calling it state sharing because state has a special meaning in Flink).
> >
> > Please discuss away!
> >
> > Aljoscha
> >
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Piotr Nowojski
Hey Becket,

Re 2.

With:

If source is purely single threaded and blocking, then it could be implemented in the following way:

/*
* Return a future, which when completed means that source has more data and getNext() will not block.
* If you wish to use benefits of non blocking connectors, please implement this method appropriately.
*/
CompletableFuture<?> isBlocked() {
        return CompletableFuture.completedFuture(null); // this would be the default behaviour, so user wouldn’t need to override this at all
}

T getNext() {
        // do some blocking reading operation
        return result;
}

Implementing `isBlocked` doesn’t have to be mandatory. It’s more like an optional optimisation that some connectors might provide.

Providing non blocking `poll` method doesn’t solve the problem of actually limiting the number of active threads. One of the potential benefits of `CompletableFuture<?> isBlocked()` is that we could have a fixed size pool of worker threads. Worker thread could pick a non blocked task that’s waiting to be executed and to this `CompletableFuture<?>` would be needed to juggle between blocked/active state. Other potential side benefit could be for reporting in UI/metrics which tasks are blocked (kind of like current back pressure monitoring).

Maybe such extension could use of some PoC that would (or not) show some benefits.

Piotrek

> On 1 Nov 2018, at 19:29, Becket Qin <[hidden email]> wrote:
>
> Thanks for the FLIP, Aljoscha.
>
> The proposal makes sense to me. Separating the split discovery and
> consumption is very useful as it enables Flink to better manage the sources.
>
> Looking at the interface, I have a few questions:
> 1. *SplitEnumerator*.*discoverNewSplits()* seems assuming that the number
> of splits can only increase, In your example, the source was Kafka, so the
> assumption was true. But I am wondering are there case that the number of
> splits can decrease?
> 2. I agree with Piotr that we need to be careful about potentially blocking
> implementations. However, it is not clear to me how does the completable
> future work if the underlying reader does not have its own thread (e.g. a
> Kafka consumer). In that case, the future will never be completed unless
> the caller thread touches the reader again. I am wondering if the following
> interfaces for the reader makes sense:
>    boolean isDone(); // Whether the source has more records.
>    T poll(); // non-blocking read. We can add a timeout if needed.
>    T take(); // blocking read;
> This seems more intuitive to people who are familiar with existing
> convention of poll() and take(). And with the non-blocking poll() we could
> have an nio Selector-like API when there are multiple splits.
>
> BTW, it would be really helpful if there is some Java doc describing the
> behavior of the the interfaces in the FLIP.
>
> Thanks again for the great proposal.
>
> Jiangjie (Becket) Qin
>
> On Thu, Nov 1, 2018 at 6:08 PM Piotr Nowojski <[hidden email]>
> wrote:
>
>> Hi,
>>
>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
>> possible improvements. I have one proposal. Instead of having a method:
>>
>> boolean advance() throws IOException;
>>
>> I would replace it with
>>
>> /*
>> * Return a future, which when completed means that source has more data
>> and getNext() will not block.
>> * If you wish to use benefits of non blocking connectors, please
>> implement this method appropriately.
>> */
>> default CompletableFuture<?> isBlocked() {
>>        return CompletableFuture.completedFuture(null);
>> }
>>
>> And rename `getCurrent()` to `getNext()`.
>>
>> Couple of arguments:
>> 1. I don’t understand the division of work between `advance()` and
>> `getCurrent()`. What should be done in which, especially for connectors
>> that handle records in batches (like Kafka) and when should you call
>> `advance` and when `getCurrent()`.
>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
>> future to have asynchronous/non blocking connectors and more efficiently
>> handle large number of blocked threads, without busy waiting. While at the
>> same time it doesn’t add much complexity, since naive connector
>> implementations can be always blocking.
>> 3. This also would allow us to use a fixed size thread pool of task
>> executors, instead of one thread per task.
>>
>> Piotrek
>>
>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]> wrote:
>>>
>>> Hi All,
>>>
>>> In order to finally get the ball rolling on the new source interface
>> that we have discussed for so long I finally created a FLIP:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>
>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion about
>> adding per-partition watermark support to the Kinesis source and because
>> this would enable generic implementation of event-time alignment for all
>> sources. Maybe we need another FLIP for the event-time alignment part,
>> especially the part about information sharing between operations (I'm not
>> calling it state sharing because state has a special meaning in Flink).
>>>
>>> Please discuss away!
>>>
>>> Aljoscha
>>>
>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Becket Qin
Thanks for the explanation, Piotr,

I agree that the completable future solution would work for single-threaded
readers. From API perspective, returning a completable future means the
reader must have an internal thread to complete that future. I was actually
thinking of some sources that are "thread-less" like the Kafka consumers.

The Kafka consumer itself does not have an internal thread except the
heartbeat thread which does nothing but heartbeat. So a Kafka consumer
relies on the user thread to call poll() to make progress. Inside the
poll() methods, things are asynchronous (with very few exceptions such as
rebalance which cannot be interrupted), receiving responses, sending
FetchRequests, Heartbeat, etc. So technically speaking, the consumer itself
is "thread-less",

In that case, if the consumer provides a isBlocked() method and return a
CompletableFuture, unless the users call poll() again, that
CompletableFuture will never be completed because the consumer itself does
not have any thread to complete that future. Instead, it relies on the user
thread, which is holding the future, to complete that same future.

While it looks counter intuitive at the first glance, such thread-less
readers could be more efficient in some cases. For example, if there are
hundreds of readers in a single task, the thread-less readers can be
managed by a single thread. That thread just need to call poll() on each
readers. On the other hand, for the single-threaded readers, there will be
one thread per reader, hundreds of threads in total. From this perspective,
such thread-less readers can do pretty well in terms of limiting the number
of threads. And users can also choose to use a thread pool to manage these
thread-less readers if they wish. And it is also trivial to wrap such
readers to create a single-threaded reader.

BTW, regarding the isBlock() method, I have a few more questions.
21, Is a method isReady() with boolean as a return value equivalent?
Personally I found it is a little bit confusing in what is supposed to be
returned when the future is completed.
22. if the implementation of isBlocked() is optional, how do the callers
know whether the method is properly implemented or not? Does not
implemented mean it always return a completed future?

Thanks,

Jiangjie (Becket) Qin




On Sat, Nov 3, 2018 at 4:30 AM Piotr Nowojski <[hidden email]>
wrote:

> Hey Becket,
>
> Re 2.
>
> With:
>
> If source is purely single threaded and blocking, then it could be
> implemented in the following way:
>
> /*
> * Return a future, which when completed means that source has more data
> and getNext() will not block.
> * If you wish to use benefits of non blocking connectors, please implement
> this method appropriately.
> */
> CompletableFuture<?> isBlocked() {
>         return CompletableFuture.completedFuture(null); // this would be
> the default behaviour, so user wouldn’t need to override this at all
> }
>
> T getNext() {
>         // do some blocking reading operation
>         return result;
> }
>
> Implementing `isBlocked` doesn’t have to be mandatory. It’s more like an
> optional optimisation that some connectors might provide.
>
> Providing non blocking `poll` method doesn’t solve the problem of actually
> limiting the number of active threads. One of the potential benefits of
> `CompletableFuture<?> isBlocked()` is that we could have a fixed size pool
> of worker threads. Worker thread could pick a non blocked task that’s
> waiting to be executed and to this `CompletableFuture<?>` would be needed
> to juggle between blocked/active state. Other potential side benefit could
> be for reporting in UI/metrics which tasks are blocked (kind of like
> current back pressure monitoring).
>
> Maybe such extension could use of some PoC that would (or not) show some
> benefits.
>
> Piotrek
>
> > On 1 Nov 2018, at 19:29, Becket Qin <[hidden email]> wrote:
> >
> > Thanks for the FLIP, Aljoscha.
> >
> > The proposal makes sense to me. Separating the split discovery and
> > consumption is very useful as it enables Flink to better manage the
> sources.
> >
> > Looking at the interface, I have a few questions:
> > 1. *SplitEnumerator*.*discoverNewSplits()* seems assuming that the number
> > of splits can only increase, In your example, the source was Kafka, so
> the
> > assumption was true. But I am wondering are there case that the number of
> > splits can decrease?
> > 2. I agree with Piotr that we need to be careful about potentially
> blocking
> > implementations. However, it is not clear to me how does the completable
> > future work if the underlying reader does not have its own thread (e.g. a
> > Kafka consumer). In that case, the future will never be completed unless
> > the caller thread touches the reader again. I am wondering if the
> following
> > interfaces for the reader makes sense:
> >    boolean isDone(); // Whether the source has more records.
> >    T poll(); // non-blocking read. We can add a timeout if needed.
> >    T take(); // blocking read;
> > This seems more intuitive to people who are familiar with existing
> > convention of poll() and take(). And with the non-blocking poll() we
> could
> > have an nio Selector-like API when there are multiple splits.
> >
> > BTW, it would be really helpful if there is some Java doc describing the
> > behavior of the the interfaces in the FLIP.
> >
> > Thanks again for the great proposal.
> >
> > Jiangjie (Becket) Qin
> >
> > On Thu, Nov 1, 2018 at 6:08 PM Piotr Nowojski <[hidden email]>
> > wrote:
> >
> >> Hi,
> >>
> >> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
> >> possible improvements. I have one proposal. Instead of having a method:
> >>
> >> boolean advance() throws IOException;
> >>
> >> I would replace it with
> >>
> >> /*
> >> * Return a future, which when completed means that source has more data
> >> and getNext() will not block.
> >> * If you wish to use benefits of non blocking connectors, please
> >> implement this method appropriately.
> >> */
> >> default CompletableFuture<?> isBlocked() {
> >>        return CompletableFuture.completedFuture(null);
> >> }
> >>
> >> And rename `getCurrent()` to `getNext()`.
> >>
> >> Couple of arguments:
> >> 1. I don’t understand the division of work between `advance()` and
> >> `getCurrent()`. What should be done in which, especially for connectors
> >> that handle records in batches (like Kafka) and when should you call
> >> `advance` and when `getCurrent()`.
> >> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
> >> future to have asynchronous/non blocking connectors and more efficiently
> >> handle large number of blocked threads, without busy waiting. While at
> the
> >> same time it doesn’t add much complexity, since naive connector
> >> implementations can be always blocking.
> >> 3. This also would allow us to use a fixed size thread pool of task
> >> executors, instead of one thread per task.
> >>
> >> Piotrek
> >>
> >>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]>
> wrote:
> >>>
> >>> Hi All,
> >>>
> >>> In order to finally get the ball rolling on the new source interface
> >> that we have discussed for so long I finally created a FLIP:
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >>>
> >>> I cc'ed Thomas and Jamie because of the ongoing work/discussion about
> >> adding per-partition watermark support to the Kinesis source and because
> >> this would enable generic implementation of event-time alignment for all
> >> sources. Maybe we need another FLIP for the event-time alignment part,
> >> especially the part about information sharing between operations (I'm
> not
> >> calling it state sharing because state has a special meaning in Flink).
> >>>
> >>> Please discuss away!
> >>>
> >>> Aljoscha
> >>>
> >>>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Guowei Ma
In reply to this post by Piotr Nowojski
Hi,
Thanks Aljoscha for this FLIP.

1. I agree with Piotr and Becket that the non-blocking source is very
important. But in addition to `Future/poll`, there may be another way to
achieve this. I think it may be not very memory friendly if every advance
call return a Future.

public interface Listener {
     public void notify();
}

public interface SplitReader() {
     /**
      * When there is no element temporarily, this will return false.
      * When elements is available again splitReader can call
listener.notify()
      * In addition the frame would check `advance` periodically .
      * Of course advance can always return true and ignore the listener
argument for simplicity.
      */
     public boolean advance(Listener listener);
}

2.  The FLIP tells us very clearly that how to create all Splits and how to
create a SplitReader from a Split. But there is no strategy for the user to
choose how to assign the splits to the tasks. I think we could add a Enum
to let user to choose.
/**
  public Enum SplitsAssignmentPolicy {
    Location,
    Workload,
    Random,
    Average
  }
*/

3. If merge the `advance` and `getCurrent`  to one method like `getNext`
the `getNext` would need return a `ElementWithTimestamp` because some
sources want to add timestamp to every element. IMO, this is not so memory
friendly so I prefer this design.



Thanks

Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:

> Hi,
>
> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
> possible improvements. I have one proposal. Instead of having a method:
>
> boolean advance() throws IOException;
>
> I would replace it with
>
> /*
>  * Return a future, which when completed means that source has more data
> and getNext() will not block.
>  * If you wish to use benefits of non blocking connectors, please
> implement this method appropriately.
>  */
> default CompletableFuture<?> isBlocked() {
>         return CompletableFuture.completedFuture(null);
> }
>
> And rename `getCurrent()` to `getNext()`.
>
> Couple of arguments:
> 1. I don’t understand the division of work between `advance()` and
> `getCurrent()`. What should be done in which, especially for connectors
> that handle records in batches (like Kafka) and when should you call
> `advance` and when `getCurrent()`.
> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
> future to have asynchronous/non blocking connectors and more efficiently
> handle large number of blocked threads, without busy waiting. While at the
> same time it doesn’t add much complexity, since naive connector
> implementations can be always blocking.
> 3. This also would allow us to use a fixed size thread pool of task
> executors, instead of one thread per task.
>
> Piotrek
>
> > On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]> wrote:
> >
> > Hi All,
> >
> > In order to finally get the ball rolling on the new source interface
> that we have discussed for so long I finally created a FLIP:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >
> > I cc'ed Thomas and Jamie because of the ongoing work/discussion about
> adding per-partition watermark support to the Kinesis source and because
> this would enable generic implementation of event-time alignment for all
> sources. Maybe we need another FLIP for the event-time alignment part,
> especially the part about information sharing between operations (I'm not
> calling it state sharing because state has a special meaning in Flink).
> >
> > Please discuss away!
> >
> > Aljoscha
> >
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Thomas Weise
Thanks for getting the ball rolling on this!

Can the number of splits decrease? Yes, splits can be closed and go away.
An example would be a shard merge in Kinesis (2 existing shards will be
closed and replaced with a new shard).

Regarding advance/poll/take: IMO the least restrictive approach would be
the thread-less IO model (pull based, non-blocking, caller retrieves new
records when available). The current Kinesis API requires the use of
threads. But that can be internal to the split reader and does not need to
be a source API concern. In fact, that's what we are working on right now
as improvement to the existing consumer: Each shard consumer thread will
push to a queue, the consumer main thread will poll the queue(s). It is
essentially a mapping from threaded IO to non-blocking.

The proposed SplitReader interface would fit the thread-less IO model.
Similar to an iterator, we find out if there is a new element (hasNext) and
if so, move to it (next()). Separate calls deliver the meta information
(timestamp, watermark). Perhaps advance call could offer a timeout option,
so that the caller does not end up in a busy wait. On the other hand, a
caller processing multiple splits may want to cycle through fast, to
process elements of other splits as soon as they become available. The nice
thing is that this "split merge" logic can now live in Flink and be
optimized and shared between different sources.

Thanks,
Thomas


On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]> wrote:

> Hi,
> Thanks Aljoscha for this FLIP.
>
> 1. I agree with Piotr and Becket that the non-blocking source is very
> important. But in addition to `Future/poll`, there may be another way to
> achieve this. I think it may be not very memory friendly if every advance
> call return a Future.
>
> public interface Listener {
>      public void notify();
> }
>
> public interface SplitReader() {
>      /**
>       * When there is no element temporarily, this will return false.
>       * When elements is available again splitReader can call
> listener.notify()
>       * In addition the frame would check `advance` periodically .
>       * Of course advance can always return true and ignore the listener
> argument for simplicity.
>       */
>      public boolean advance(Listener listener);
> }
>
> 2.  The FLIP tells us very clearly that how to create all Splits and how
> to create a SplitReader from a Split. But there is no strategy for the user
> to choose how to assign the splits to the tasks. I think we could add a
> Enum to let user to choose.
> /**
>   public Enum SplitsAssignmentPolicy {
>     Location,
>     Workload,
>     Random,
>     Average
>   }
> */
>
> 3. If merge the `advance` and `getCurrent`  to one method like `getNext`
> the `getNext` would need return a `ElementWithTimestamp` because some
> sources want to add timestamp to every element. IMO, this is not so memory
> friendly so I prefer this design.
>
>
>
> Thanks
>
> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
>
>> Hi,
>>
>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
>> possible improvements. I have one proposal. Instead of having a method:
>>
>> boolean advance() throws IOException;
>>
>> I would replace it with
>>
>> /*
>>  * Return a future, which when completed means that source has more data
>> and getNext() will not block.
>>  * If you wish to use benefits of non blocking connectors, please
>> implement this method appropriately.
>>  */
>> default CompletableFuture<?> isBlocked() {
>>         return CompletableFuture.completedFuture(null);
>> }
>>
>> And rename `getCurrent()` to `getNext()`.
>>
>> Couple of arguments:
>> 1. I don’t understand the division of work between `advance()` and
>> `getCurrent()`. What should be done in which, especially for connectors
>> that handle records in batches (like Kafka) and when should you call
>> `advance` and when `getCurrent()`.
>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
>> future to have asynchronous/non blocking connectors and more efficiently
>> handle large number of blocked threads, without busy waiting. While at the
>> same time it doesn’t add much complexity, since naive connector
>> implementations can be always blocking.
>> 3. This also would allow us to use a fixed size thread pool of task
>> executors, instead of one thread per task.
>>
>> Piotrek
>>
>> > On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]> wrote:
>> >
>> > Hi All,
>> >
>> > In order to finally get the ball rolling on the new source interface
>> that we have discussed for so long I finally created a FLIP:
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>> >
>> > I cc'ed Thomas and Jamie because of the ongoing work/discussion about
>> adding per-partition watermark support to the Kinesis source and because
>> this would enable generic implementation of event-time alignment for all
>> sources. Maybe we need another FLIP for the event-time alignment part,
>> especially the part about information sharing between operations (I'm not
>> calling it state sharing because state has a special meaning in Flink).
>> >
>> > Please discuss away!
>> >
>> > Aljoscha
>> >
>> >
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Thomas Weise
Couple more points regarding discovery:

The proposal mentions that discovery could be outside the execution graph.
Today, discovered partitions/shards are checkpointed. I believe that will
also need to be the case in the future, even when discovery and reading are
split between different tasks.

For cases such as resharding of a Kinesis stream, the relationship between
splits needs to be considered. Splits cannot be randomly distributed over
readers in certain situations. An example was mentioned here:
https://github.com/apache/flink/pull/6980#issuecomment-435202809

Thomas


On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[hidden email]> wrote:

> Thanks for getting the ball rolling on this!
>
> Can the number of splits decrease? Yes, splits can be closed and go away.
> An example would be a shard merge in Kinesis (2 existing shards will be
> closed and replaced with a new shard).
>
> Regarding advance/poll/take: IMO the least restrictive approach would be
> the thread-less IO model (pull based, non-blocking, caller retrieves new
> records when available). The current Kinesis API requires the use of
> threads. But that can be internal to the split reader and does not need to
> be a source API concern. In fact, that's what we are working on right now
> as improvement to the existing consumer: Each shard consumer thread will
> push to a queue, the consumer main thread will poll the queue(s). It is
> essentially a mapping from threaded IO to non-blocking.
>
> The proposed SplitReader interface would fit the thread-less IO model.
> Similar to an iterator, we find out if there is a new element (hasNext) and
> if so, move to it (next()). Separate calls deliver the meta information
> (timestamp, watermark). Perhaps advance call could offer a timeout option,
> so that the caller does not end up in a busy wait. On the other hand, a
> caller processing multiple splits may want to cycle through fast, to
> process elements of other splits as soon as they become available. The nice
> thing is that this "split merge" logic can now live in Flink and be
> optimized and shared between different sources.
>
> Thanks,
> Thomas
>
>
> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]> wrote:
>
>> Hi,
>> Thanks Aljoscha for this FLIP.
>>
>> 1. I agree with Piotr and Becket that the non-blocking source is very
>> important. But in addition to `Future/poll`, there may be another way to
>> achieve this. I think it may be not very memory friendly if every advance
>> call return a Future.
>>
>> public interface Listener {
>>      public void notify();
>> }
>>
>> public interface SplitReader() {
>>      /**
>>       * When there is no element temporarily, this will return false.
>>       * When elements is available again splitReader can call
>> listener.notify()
>>       * In addition the frame would check `advance` periodically .
>>       * Of course advance can always return true and ignore the listener
>> argument for simplicity.
>>       */
>>      public boolean advance(Listener listener);
>> }
>>
>> 2.  The FLIP tells us very clearly that how to create all Splits and how
>> to create a SplitReader from a Split. But there is no strategy for the user
>> to choose how to assign the splits to the tasks. I think we could add a
>> Enum to let user to choose.
>> /**
>>   public Enum SplitsAssignmentPolicy {
>>     Location,
>>     Workload,
>>     Random,
>>     Average
>>   }
>> */
>>
>> 3. If merge the `advance` and `getCurrent`  to one method like `getNext`
>> the `getNext` would need return a `ElementWithTimestamp` because some
>> sources want to add timestamp to every element. IMO, this is not so memory
>> friendly so I prefer this design.
>>
>>
>>
>> Thanks
>>
>> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
>>
>>> Hi,
>>>
>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
>>> possible improvements. I have one proposal. Instead of having a method:
>>>
>>> boolean advance() throws IOException;
>>>
>>> I would replace it with
>>>
>>> /*
>>>  * Return a future, which when completed means that source has more data
>>> and getNext() will not block.
>>>  * If you wish to use benefits of non blocking connectors, please
>>> implement this method appropriately.
>>>  */
>>> default CompletableFuture<?> isBlocked() {
>>>         return CompletableFuture.completedFuture(null);
>>> }
>>>
>>> And rename `getCurrent()` to `getNext()`.
>>>
>>> Couple of arguments:
>>> 1. I don’t understand the division of work between `advance()` and
>>> `getCurrent()`. What should be done in which, especially for connectors
>>> that handle records in batches (like Kafka) and when should you call
>>> `advance` and when `getCurrent()`.
>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
>>> future to have asynchronous/non blocking connectors and more efficiently
>>> handle large number of blocked threads, without busy waiting. While at the
>>> same time it doesn’t add much complexity, since naive connector
>>> implementations can be always blocking.
>>> 3. This also would allow us to use a fixed size thread pool of task
>>> executors, instead of one thread per task.
>>>
>>> Piotrek
>>>
>>> > On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]>
>>> wrote:
>>> >
>>> > Hi All,
>>> >
>>> > In order to finally get the ball rolling on the new source interface
>>> that we have discussed for so long I finally created a FLIP:
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>> >
>>> > I cc'ed Thomas and Jamie because of the ongoing work/discussion about
>>> adding per-partition watermark support to the Kinesis source and because
>>> this would enable generic implementation of event-time alignment for all
>>> sources. Maybe we need another FLIP for the event-time alignment part,
>>> especially the part about information sharing between operations (I'm not
>>> calling it state sharing because state has a special meaning in Flink).
>>> >
>>> > Please discuss away!
>>> >
>>> > Aljoscha
>>> >
>>> >
>>>
>>>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Becket Qin
Hi Thomas,

The iterator-like API was also the first thing that came to me. But it
seems a little confusing that hasNext() does not mean "the stream has not
ended", but means "the next record is ready", which is repurposing the well
known meaning of hasNext(). If we follow the hasNext()/next() pattern, an
additional isNextReady() method to indicate whether the next record is
ready seems more intuitive to me.

Similarly, in poll()/take() pattern, another method of isDone() is needed
to indicate whether the stream has ended or not.

Compared with hasNext()/next()/isNextReady() pattern,
isDone()/poll()/take() seems more flexible for the reader implementation.
When I am implementing a reader, I could have a couple of choices:

   - A thread-less reader that does not have any internal thread.
   - When poll() is called, the same calling thread will perform a bunch of
      IO asynchronously.
      - When take() is called, the same calling thread will perform a bunch
      of IO and wait until the record is ready.
   - A reader with internal threads performing network IO and put records
   into a buffer.
      - When poll() is called, the calling thread simply reads from the
      buffer and return empty result immediately if there is no record.
      - When take() is called, the calling thread reads from the buffer and
      block waiting if the buffer is empty.

On the other hand, with the hasNext()/next()/isNextReady() API, it is less
intuitive for the reader developers to write the thread-less pattern.
Although technically speaking one can still do the asynchronous IO to
prepare the record in isNextReady(). But it is inexplicit and seems
somewhat hacky.

Thanks,

Jiangjie (Becket) Qin



On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <[hidden email]> wrote:

> Couple more points regarding discovery:
>
> The proposal mentions that discovery could be outside the execution graph.
> Today, discovered partitions/shards are checkpointed. I believe that will
> also need to be the case in the future, even when discovery and reading are
> split between different tasks.
>
> For cases such as resharding of a Kinesis stream, the relationship between
> splits needs to be considered. Splits cannot be randomly distributed over
> readers in certain situations. An example was mentioned here:
> https://github.com/apache/flink/pull/6980#issuecomment-435202809
>
> Thomas
>
>
> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[hidden email]> wrote:
>
> > Thanks for getting the ball rolling on this!
> >
> > Can the number of splits decrease? Yes, splits can be closed and go away.
> > An example would be a shard merge in Kinesis (2 existing shards will be
> > closed and replaced with a new shard).
> >
> > Regarding advance/poll/take: IMO the least restrictive approach would be
> > the thread-less IO model (pull based, non-blocking, caller retrieves new
> > records when available). The current Kinesis API requires the use of
> > threads. But that can be internal to the split reader and does not need
> to
> > be a source API concern. In fact, that's what we are working on right now
> > as improvement to the existing consumer: Each shard consumer thread will
> > push to a queue, the consumer main thread will poll the queue(s). It is
> > essentially a mapping from threaded IO to non-blocking.
> >
> > The proposed SplitReader interface would fit the thread-less IO model.
> > Similar to an iterator, we find out if there is a new element (hasNext)
> and
> > if so, move to it (next()). Separate calls deliver the meta information
> > (timestamp, watermark). Perhaps advance call could offer a timeout
> option,
> > so that the caller does not end up in a busy wait. On the other hand, a
> > caller processing multiple splits may want to cycle through fast, to
> > process elements of other splits as soon as they become available. The
> nice
> > thing is that this "split merge" logic can now live in Flink and be
> > optimized and shared between different sources.
> >
> > Thanks,
> > Thomas
> >
> >
> > On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]> wrote:
> >
> >> Hi,
> >> Thanks Aljoscha for this FLIP.
> >>
> >> 1. I agree with Piotr and Becket that the non-blocking source is very
> >> important. But in addition to `Future/poll`, there may be another way to
> >> achieve this. I think it may be not very memory friendly if every
> advance
> >> call return a Future.
> >>
> >> public interface Listener {
> >>      public void notify();
> >> }
> >>
> >> public interface SplitReader() {
> >>      /**
> >>       * When there is no element temporarily, this will return false.
> >>       * When elements is available again splitReader can call
> >> listener.notify()
> >>       * In addition the frame would check `advance` periodically .
> >>       * Of course advance can always return true and ignore the listener
> >> argument for simplicity.
> >>       */
> >>      public boolean advance(Listener listener);
> >> }
> >>
> >> 2.  The FLIP tells us very clearly that how to create all Splits and how
> >> to create a SplitReader from a Split. But there is no strategy for the
> user
> >> to choose how to assign the splits to the tasks. I think we could add a
> >> Enum to let user to choose.
> >> /**
> >>   public Enum SplitsAssignmentPolicy {
> >>     Location,
> >>     Workload,
> >>     Random,
> >>     Average
> >>   }
> >> */
> >>
> >> 3. If merge the `advance` and `getCurrent`  to one method like `getNext`
> >> the `getNext` would need return a `ElementWithTimestamp` because some
> >> sources want to add timestamp to every element. IMO, this is not so
> memory
> >> friendly so I prefer this design.
> >>
> >>
> >>
> >> Thanks
> >>
> >> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
> >>
> >>> Hi,
> >>>
> >>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
> >>> possible improvements. I have one proposal. Instead of having a method:
> >>>
> >>> boolean advance() throws IOException;
> >>>
> >>> I would replace it with
> >>>
> >>> /*
> >>>  * Return a future, which when completed means that source has more
> data
> >>> and getNext() will not block.
> >>>  * If you wish to use benefits of non blocking connectors, please
> >>> implement this method appropriately.
> >>>  */
> >>> default CompletableFuture<?> isBlocked() {
> >>>         return CompletableFuture.completedFuture(null);
> >>> }
> >>>
> >>> And rename `getCurrent()` to `getNext()`.
> >>>
> >>> Couple of arguments:
> >>> 1. I don’t understand the division of work between `advance()` and
> >>> `getCurrent()`. What should be done in which, especially for connectors
> >>> that handle records in batches (like Kafka) and when should you call
> >>> `advance` and when `getCurrent()`.
> >>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
> >>> future to have asynchronous/non blocking connectors and more
> efficiently
> >>> handle large number of blocked threads, without busy waiting. While at
> the
> >>> same time it doesn’t add much complexity, since naive connector
> >>> implementations can be always blocking.
> >>> 3. This also would allow us to use a fixed size thread pool of task
> >>> executors, instead of one thread per task.
> >>>
> >>> Piotrek
> >>>
> >>> > On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]>
> >>> wrote:
> >>> >
> >>> > Hi All,
> >>> >
> >>> > In order to finally get the ball rolling on the new source interface
> >>> that we have discussed for so long I finally created a FLIP:
> >>>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >>> >
> >>> > I cc'ed Thomas and Jamie because of the ongoing work/discussion about
> >>> adding per-partition watermark support to the Kinesis source and
> because
> >>> this would enable generic implementation of event-time alignment for
> all
> >>> sources. Maybe we need another FLIP for the event-time alignment part,
> >>> especially the part about information sharing between operations (I'm
> not
> >>> calling it state sharing because state has a special meaning in Flink).
> >>> >
> >>> > Please discuss away!
> >>> >
> >>> > Aljoscha
> >>> >
> >>> >
> >>>
> >>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Biao Liu
Thanks Aljoscha for bringing us this discussion!

1. I think one of the reason about separating `advance()` and
`getCurrent()` is that we have several different types returned by source.
Not just the `record`, but also the timestamp of record and the watermark.
If we don't separate these into different methods, the source has to return
a tuple3 which is not so user friendly. The prototype of Aljoscha is
acceptable to me. Regarding the specific method name, I'm not sure which
one is better. Both of them are reasonable for me.

2. As Thomas and Becket mentioned before, I think a non-blocking API is
necessary. Moreover, IMO we should not offer a blocking API. It doesn't
help but makes things more complicated.

3. About the thread model.
I agree with Thomas about the thread-less IO model. A standard workflow
should look like below.
  - If there is available data, Flink would read it.
  - If there is no data available temporary, Flink would check again a
moment later. Maybe waiting on a semaphore until a timer wake it up.
Furthermore, we can offer an optional optimization for source which has
external thread. Like Guowei mentioned, there can be a listener which the
reader can wake the framework up as soon as new data comes. This can solve
Piotr's concern about efficiency.

4. One more thing. After taking a look at the prototype codes. Off the top
of my head, the implementation is more fit for batch job not streaming job.
There are two types of tasks in prototype. First is a source task that
discovers the splits. The source passes the splits to the second task which
process the splits one by one. And then the source keeps watch to discover
more splits.

However, I think the more common scenario of streaming job is:
there are fixed splits, each of the subtasks takes several splits. The
subtasks just keep processing the fixed splits. There would be continuous
datum in each split. We don't need a source task to discover more splits.
It can not be finished in streaming job since we don't want the processing
task finished even there are no more splits.

So IMO we should offer another source operator for the new interface. It
would discover all splits when it is opening. Then picks the splits belong
to this subtask. Keep processing these splits until all of them are
finished.


Becket Qin <[hidden email]> 于2018年11月5日周一 上午11:00写道:

> Hi Thomas,
>
> The iterator-like API was also the first thing that came to me. But it
> seems a little confusing that hasNext() does not mean "the stream has not
> ended", but means "the next record is ready", which is repurposing the well
> known meaning of hasNext(). If we follow the hasNext()/next() pattern, an
> additional isNextReady() method to indicate whether the next record is
> ready seems more intuitive to me.
>
> Similarly, in poll()/take() pattern, another method of isDone() is needed
> to indicate whether the stream has ended or not.
>
> Compared with hasNext()/next()/isNextReady() pattern,
> isDone()/poll()/take() seems more flexible for the reader implementation.
> When I am implementing a reader, I could have a couple of choices:
>
>    - A thread-less reader that does not have any internal thread.
>    - When poll() is called, the same calling thread will perform a bunch of
>       IO asynchronously.
>       - When take() is called, the same calling thread will perform a bunch
>       of IO and wait until the record is ready.
>    - A reader with internal threads performing network IO and put records
>    into a buffer.
>       - When poll() is called, the calling thread simply reads from the
>       buffer and return empty result immediately if there is no record.
>       - When take() is called, the calling thread reads from the buffer and
>       block waiting if the buffer is empty.
>
> On the other hand, with the hasNext()/next()/isNextReady() API, it is less
> intuitive for the reader developers to write the thread-less pattern.
> Although technically speaking one can still do the asynchronous IO to
> prepare the record in isNextReady(). But it is inexplicit and seems
> somewhat hacky.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <[hidden email]> wrote:
>
> > Couple more points regarding discovery:
> >
> > The proposal mentions that discovery could be outside the execution
> graph.
> > Today, discovered partitions/shards are checkpointed. I believe that will
> > also need to be the case in the future, even when discovery and reading
> are
> > split between different tasks.
> >
> > For cases such as resharding of a Kinesis stream, the relationship
> between
> > splits needs to be considered. Splits cannot be randomly distributed over
> > readers in certain situations. An example was mentioned here:
> > https://github.com/apache/flink/pull/6980#issuecomment-435202809
> >
> > Thomas
> >
> >
> > On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[hidden email]> wrote:
> >
> > > Thanks for getting the ball rolling on this!
> > >
> > > Can the number of splits decrease? Yes, splits can be closed and go
> away.
> > > An example would be a shard merge in Kinesis (2 existing shards will be
> > > closed and replaced with a new shard).
> > >
> > > Regarding advance/poll/take: IMO the least restrictive approach would
> be
> > > the thread-less IO model (pull based, non-blocking, caller retrieves
> new
> > > records when available). The current Kinesis API requires the use of
> > > threads. But that can be internal to the split reader and does not need
> > to
> > > be a source API concern. In fact, that's what we are working on right
> now
> > > as improvement to the existing consumer: Each shard consumer thread
> will
> > > push to a queue, the consumer main thread will poll the queue(s). It is
> > > essentially a mapping from threaded IO to non-blocking.
> > >
> > > The proposed SplitReader interface would fit the thread-less IO model.
> > > Similar to an iterator, we find out if there is a new element (hasNext)
> > and
> > > if so, move to it (next()). Separate calls deliver the meta information
> > > (timestamp, watermark). Perhaps advance call could offer a timeout
> > option,
> > > so that the caller does not end up in a busy wait. On the other hand, a
> > > caller processing multiple splits may want to cycle through fast, to
> > > process elements of other splits as soon as they become available. The
> > nice
> > > thing is that this "split merge" logic can now live in Flink and be
> > > optimized and shared between different sources.
> > >
> > > Thanks,
> > > Thomas
> > >
> > >
> > > On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]> wrote:
> > >
> > >> Hi,
> > >> Thanks Aljoscha for this FLIP.
> > >>
> > >> 1. I agree with Piotr and Becket that the non-blocking source is very
> > >> important. But in addition to `Future/poll`, there may be another way
> to
> > >> achieve this. I think it may be not very memory friendly if every
> > advance
> > >> call return a Future.
> > >>
> > >> public interface Listener {
> > >>      public void notify();
> > >> }
> > >>
> > >> public interface SplitReader() {
> > >>      /**
> > >>       * When there is no element temporarily, this will return false.
> > >>       * When elements is available again splitReader can call
> > >> listener.notify()
> > >>       * In addition the frame would check `advance` periodically .
> > >>       * Of course advance can always return true and ignore the
> listener
> > >> argument for simplicity.
> > >>       */
> > >>      public boolean advance(Listener listener);
> > >> }
> > >>
> > >> 2.  The FLIP tells us very clearly that how to create all Splits and
> how
> > >> to create a SplitReader from a Split. But there is no strategy for the
> > user
> > >> to choose how to assign the splits to the tasks. I think we could add
> a
> > >> Enum to let user to choose.
> > >> /**
> > >>   public Enum SplitsAssignmentPolicy {
> > >>     Location,
> > >>     Workload,
> > >>     Random,
> > >>     Average
> > >>   }
> > >> */
> > >>
> > >> 3. If merge the `advance` and `getCurrent`  to one method like
> `getNext`
> > >> the `getNext` would need return a `ElementWithTimestamp` because some
> > >> sources want to add timestamp to every element. IMO, this is not so
> > memory
> > >> friendly so I prefer this design.
> > >>
> > >>
> > >>
> > >> Thanks
> > >>
> > >> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
> > >>
> > >>> Hi,
> > >>>
> > >>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
> > >>> possible improvements. I have one proposal. Instead of having a
> method:
> > >>>
> > >>> boolean advance() throws IOException;
> > >>>
> > >>> I would replace it with
> > >>>
> > >>> /*
> > >>>  * Return a future, which when completed means that source has more
> > data
> > >>> and getNext() will not block.
> > >>>  * If you wish to use benefits of non blocking connectors, please
> > >>> implement this method appropriately.
> > >>>  */
> > >>> default CompletableFuture<?> isBlocked() {
> > >>>         return CompletableFuture.completedFuture(null);
> > >>> }
> > >>>
> > >>> And rename `getCurrent()` to `getNext()`.
> > >>>
> > >>> Couple of arguments:
> > >>> 1. I don’t understand the division of work between `advance()` and
> > >>> `getCurrent()`. What should be done in which, especially for
> connectors
> > >>> that handle records in batches (like Kafka) and when should you call
> > >>> `advance` and when `getCurrent()`.
> > >>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in
> the
> > >>> future to have asynchronous/non blocking connectors and more
> > efficiently
> > >>> handle large number of blocked threads, without busy waiting. While
> at
> > the
> > >>> same time it doesn’t add much complexity, since naive connector
> > >>> implementations can be always blocking.
> > >>> 3. This also would allow us to use a fixed size thread pool of task
> > >>> executors, instead of one thread per task.
> > >>>
> > >>> Piotrek
> > >>>
> > >>> > On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]>
> > >>> wrote:
> > >>> >
> > >>> > Hi All,
> > >>> >
> > >>> > In order to finally get the ball rolling on the new source
> interface
> > >>> that we have discussed for so long I finally created a FLIP:
> > >>>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > >>> >
> > >>> > I cc'ed Thomas and Jamie because of the ongoing work/discussion
> about
> > >>> adding per-partition watermark support to the Kinesis source and
> > because
> > >>> this would enable generic implementation of event-time alignment for
> > all
> > >>> sources. Maybe we need another FLIP for the event-time alignment
> part,
> > >>> especially the part about information sharing between operations (I'm
> > not
> > >>> calling it state sharing because state has a special meaning in
> Flink).
> > >>> >
> > >>> > Please discuss away!
> > >>> >
> > >>> > Aljoscha
> > >>> >
> > >>> >
> > >>>
> > >>>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Aljoscha Krettek-2
I updated the FLIP [1] with some Javadoc for the SplitReader to outline what I had in mind with the interface. Sorry for not doing that earlier, it's not quite clear how the methods should work from the name alone.

The gist of it is that advance() should be non-blocking, so isDone/advance()/getCurrent() are very similar to isDone()/poll()/take() that I have seen mentioned.

[1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface <https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface>

> On 5. Nov 2018, at 11:05, Biao Liu <[hidden email]> wrote:
>
> Thanks Aljoscha for bringing us this discussion!
>
> 1. I think one of the reason about separating `advance()` and
> `getCurrent()` is that we have several different types returned by source.
> Not just the `record`, but also the timestamp of record and the watermark.
> If we don't separate these into different methods, the source has to return
> a tuple3 which is not so user friendly. The prototype of Aljoscha is
> acceptable to me. Regarding the specific method name, I'm not sure which
> one is better. Both of them are reasonable for me.
>
> 2. As Thomas and Becket mentioned before, I think a non-blocking API is
> necessary. Moreover, IMO we should not offer a blocking API. It doesn't
> help but makes things more complicated.
>
> 3. About the thread model.
> I agree with Thomas about the thread-less IO model. A standard workflow
> should look like below.
>  - If there is available data, Flink would read it.
>  - If there is no data available temporary, Flink would check again a
> moment later. Maybe waiting on a semaphore until a timer wake it up.
> Furthermore, we can offer an optional optimization for source which has
> external thread. Like Guowei mentioned, there can be a listener which the
> reader can wake the framework up as soon as new data comes. This can solve
> Piotr's concern about efficiency.
>
> 4. One more thing. After taking a look at the prototype codes. Off the top
> of my head, the implementation is more fit for batch job not streaming job.
> There are two types of tasks in prototype. First is a source task that
> discovers the splits. The source passes the splits to the second task which
> process the splits one by one. And then the source keeps watch to discover
> more splits.
>
> However, I think the more common scenario of streaming job is:
> there are fixed splits, each of the subtasks takes several splits. The
> subtasks just keep processing the fixed splits. There would be continuous
> datum in each split. We don't need a source task to discover more splits.
> It can not be finished in streaming job since we don't want the processing
> task finished even there are no more splits.
>
> So IMO we should offer another source operator for the new interface. It
> would discover all splits when it is opening. Then picks the splits belong
> to this subtask. Keep processing these splits until all of them are
> finished.
>
>
> Becket Qin <[hidden email]> 于2018年11月5日周一 上午11:00写道:
>
>> Hi Thomas,
>>
>> The iterator-like API was also the first thing that came to me. But it
>> seems a little confusing that hasNext() does not mean "the stream has not
>> ended", but means "the next record is ready", which is repurposing the well
>> known meaning of hasNext(). If we follow the hasNext()/next() pattern, an
>> additional isNextReady() method to indicate whether the next record is
>> ready seems more intuitive to me.
>>
>> Similarly, in poll()/take() pattern, another method of isDone() is needed
>> to indicate whether the stream has ended or not.
>>
>> Compared with hasNext()/next()/isNextReady() pattern,
>> isDone()/poll()/take() seems more flexible for the reader implementation.
>> When I am implementing a reader, I could have a couple of choices:
>>
>>   - A thread-less reader that does not have any internal thread.
>>   - When poll() is called, the same calling thread will perform a bunch of
>>      IO asynchronously.
>>      - When take() is called, the same calling thread will perform a bunch
>>      of IO and wait until the record is ready.
>>   - A reader with internal threads performing network IO and put records
>>   into a buffer.
>>      - When poll() is called, the calling thread simply reads from the
>>      buffer and return empty result immediately if there is no record.
>>      - When take() is called, the calling thread reads from the buffer and
>>      block waiting if the buffer is empty.
>>
>> On the other hand, with the hasNext()/next()/isNextReady() API, it is less
>> intuitive for the reader developers to write the thread-less pattern.
>> Although technically speaking one can still do the asynchronous IO to
>> prepare the record in isNextReady(). But it is inexplicit and seems
>> somewhat hacky.
>>
>> Thanks,
>>
>> Jiangjie (Becket) Qin
>>
>>
>>
>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <[hidden email]> wrote:
>>
>>> Couple more points regarding discovery:
>>>
>>> The proposal mentions that discovery could be outside the execution
>> graph.
>>> Today, discovered partitions/shards are checkpointed. I believe that will
>>> also need to be the case in the future, even when discovery and reading
>> are
>>> split between different tasks.
>>>
>>> For cases such as resharding of a Kinesis stream, the relationship
>> between
>>> splits needs to be considered. Splits cannot be randomly distributed over
>>> readers in certain situations. An example was mentioned here:
>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
>>>
>>> Thomas
>>>
>>>
>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[hidden email]> wrote:
>>>
>>>> Thanks for getting the ball rolling on this!
>>>>
>>>> Can the number of splits decrease? Yes, splits can be closed and go
>> away.
>>>> An example would be a shard merge in Kinesis (2 existing shards will be
>>>> closed and replaced with a new shard).
>>>>
>>>> Regarding advance/poll/take: IMO the least restrictive approach would
>> be
>>>> the thread-less IO model (pull based, non-blocking, caller retrieves
>> new
>>>> records when available). The current Kinesis API requires the use of
>>>> threads. But that can be internal to the split reader and does not need
>>> to
>>>> be a source API concern. In fact, that's what we are working on right
>> now
>>>> as improvement to the existing consumer: Each shard consumer thread
>> will
>>>> push to a queue, the consumer main thread will poll the queue(s). It is
>>>> essentially a mapping from threaded IO to non-blocking.
>>>>
>>>> The proposed SplitReader interface would fit the thread-less IO model.
>>>> Similar to an iterator, we find out if there is a new element (hasNext)
>>> and
>>>> if so, move to it (next()). Separate calls deliver the meta information
>>>> (timestamp, watermark). Perhaps advance call could offer a timeout
>>> option,
>>>> so that the caller does not end up in a busy wait. On the other hand, a
>>>> caller processing multiple splits may want to cycle through fast, to
>>>> process elements of other splits as soon as they become available. The
>>> nice
>>>> thing is that this "split merge" logic can now live in Flink and be
>>>> optimized and shared between different sources.
>>>>
>>>> Thanks,
>>>> Thomas
>>>>
>>>>
>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]> wrote:
>>>>
>>>>> Hi,
>>>>> Thanks Aljoscha for this FLIP.
>>>>>
>>>>> 1. I agree with Piotr and Becket that the non-blocking source is very
>>>>> important. But in addition to `Future/poll`, there may be another way
>> to
>>>>> achieve this. I think it may be not very memory friendly if every
>>> advance
>>>>> call return a Future.
>>>>>
>>>>> public interface Listener {
>>>>>     public void notify();
>>>>> }
>>>>>
>>>>> public interface SplitReader() {
>>>>>     /**
>>>>>      * When there is no element temporarily, this will return false.
>>>>>      * When elements is available again splitReader can call
>>>>> listener.notify()
>>>>>      * In addition the frame would check `advance` periodically .
>>>>>      * Of course advance can always return true and ignore the
>> listener
>>>>> argument for simplicity.
>>>>>      */
>>>>>     public boolean advance(Listener listener);
>>>>> }
>>>>>
>>>>> 2.  The FLIP tells us very clearly that how to create all Splits and
>> how
>>>>> to create a SplitReader from a Split. But there is no strategy for the
>>> user
>>>>> to choose how to assign the splits to the tasks. I think we could add
>> a
>>>>> Enum to let user to choose.
>>>>> /**
>>>>>  public Enum SplitsAssignmentPolicy {
>>>>>    Location,
>>>>>    Workload,
>>>>>    Random,
>>>>>    Average
>>>>>  }
>>>>> */
>>>>>
>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
>> `getNext`
>>>>> the `getNext` would need return a `ElementWithTimestamp` because some
>>>>> sources want to add timestamp to every element. IMO, this is not so
>>> memory
>>>>> friendly so I prefer this design.
>>>>>
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
>>>>>> possible improvements. I have one proposal. Instead of having a
>> method:
>>>>>>
>>>>>> boolean advance() throws IOException;
>>>>>>
>>>>>> I would replace it with
>>>>>>
>>>>>> /*
>>>>>> * Return a future, which when completed means that source has more
>>> data
>>>>>> and getNext() will not block.
>>>>>> * If you wish to use benefits of non blocking connectors, please
>>>>>> implement this method appropriately.
>>>>>> */
>>>>>> default CompletableFuture<?> isBlocked() {
>>>>>>        return CompletableFuture.completedFuture(null);
>>>>>> }
>>>>>>
>>>>>> And rename `getCurrent()` to `getNext()`.
>>>>>>
>>>>>> Couple of arguments:
>>>>>> 1. I don’t understand the division of work between `advance()` and
>>>>>> `getCurrent()`. What should be done in which, especially for
>> connectors
>>>>>> that handle records in batches (like Kafka) and when should you call
>>>>>> `advance` and when `getCurrent()`.
>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in
>> the
>>>>>> future to have asynchronous/non blocking connectors and more
>>> efficiently
>>>>>> handle large number of blocked threads, without busy waiting. While
>> at
>>> the
>>>>>> same time it doesn’t add much complexity, since naive connector
>>>>>> implementations can be always blocking.
>>>>>> 3. This also would allow us to use a fixed size thread pool of task
>>>>>> executors, instead of one thread per task.
>>>>>>
>>>>>> Piotrek
>>>>>>
>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]>
>>>>>> wrote:
>>>>>>>
>>>>>>> Hi All,
>>>>>>>
>>>>>>> In order to finally get the ball rolling on the new source
>> interface
>>>>>> that we have discussed for so long I finally created a FLIP:
>>>>>>
>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>>>
>>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
>> about
>>>>>> adding per-partition watermark support to the Kinesis source and
>>> because
>>>>>> this would enable generic implementation of event-time alignment for
>>> all
>>>>>> sources. Maybe we need another FLIP for the event-time alignment
>> part,
>>>>>> especially the part about information sharing between operations (I'm
>>> not
>>>>>> calling it state sharing because state has a special meaning in
>> Flink).
>>>>>>>
>>>>>>> Please discuss away!
>>>>>>>
>>>>>>> Aljoscha
>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Becket Qin
Thanks for updating the wiki, Aljoscha.

The isDone()/advance()/getCurrent() API looks more similar to
hasNext()/isNextReady()/getNext(), but implying some different behaviors.

If users call getCurrent() twice without calling advance() in between, will
they get the same record back? From the API itself, users might think
advance() is the API that moves the offset forward, and getCurrent() just
return the record at the current offset.

Thanks,

Jiangjie (Becket) Qin

On Mon, Nov 5, 2018 at 10:41 PM Aljoscha Krettek <[hidden email]>
wrote:

> I updated the FLIP [1] with some Javadoc for the SplitReader to outline
> what I had in mind with the interface. Sorry for not doing that earlier,
> it's not quite clear how the methods should work from the name alone.
>
> The gist of it is that advance() should be non-blocking, so
> isDone/advance()/getCurrent() are very similar to isDone()/poll()/take()
> that I have seen mentioned.
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> <
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> >
>
> > On 5. Nov 2018, at 11:05, Biao Liu <[hidden email]> wrote:
> >
> > Thanks Aljoscha for bringing us this discussion!
> >
> > 1. I think one of the reason about separating `advance()` and
> > `getCurrent()` is that we have several different types returned by
> source.
> > Not just the `record`, but also the timestamp of record and the
> watermark.
> > If we don't separate these into different methods, the source has to
> return
> > a tuple3 which is not so user friendly. The prototype of Aljoscha is
> > acceptable to me. Regarding the specific method name, I'm not sure which
> > one is better. Both of them are reasonable for me.
> >
> > 2. As Thomas and Becket mentioned before, I think a non-blocking API is
> > necessary. Moreover, IMO we should not offer a blocking API. It doesn't
> > help but makes things more complicated.
> >
> > 3. About the thread model.
> > I agree with Thomas about the thread-less IO model. A standard workflow
> > should look like below.
> >  - If there is available data, Flink would read it.
> >  - If there is no data available temporary, Flink would check again a
> > moment later. Maybe waiting on a semaphore until a timer wake it up.
> > Furthermore, we can offer an optional optimization for source which has
> > external thread. Like Guowei mentioned, there can be a listener which the
> > reader can wake the framework up as soon as new data comes. This can
> solve
> > Piotr's concern about efficiency.
> >
> > 4. One more thing. After taking a look at the prototype codes. Off the
> top
> > of my head, the implementation is more fit for batch job not streaming
> job.
> > There are two types of tasks in prototype. First is a source task that
> > discovers the splits. The source passes the splits to the second task
> which
> > process the splits one by one. And then the source keeps watch to
> discover
> > more splits.
> >
> > However, I think the more common scenario of streaming job is:
> > there are fixed splits, each of the subtasks takes several splits. The
> > subtasks just keep processing the fixed splits. There would be continuous
> > datum in each split. We don't need a source task to discover more splits.
> > It can not be finished in streaming job since we don't want the
> processing
> > task finished even there are no more splits.
> >
> > So IMO we should offer another source operator for the new interface. It
> > would discover all splits when it is opening. Then picks the splits
> belong
> > to this subtask. Keep processing these splits until all of them are
> > finished.
> >
> >
> > Becket Qin <[hidden email]> 于2018年11月5日周一 上午11:00写道:
> >
> >> Hi Thomas,
> >>
> >> The iterator-like API was also the first thing that came to me. But it
> >> seems a little confusing that hasNext() does not mean "the stream has
> not
> >> ended", but means "the next record is ready", which is repurposing the
> well
> >> known meaning of hasNext(). If we follow the hasNext()/next() pattern,
> an
> >> additional isNextReady() method to indicate whether the next record is
> >> ready seems more intuitive to me.
> >>
> >> Similarly, in poll()/take() pattern, another method of isDone() is
> needed
> >> to indicate whether the stream has ended or not.
> >>
> >> Compared with hasNext()/next()/isNextReady() pattern,
> >> isDone()/poll()/take() seems more flexible for the reader
> implementation.
> >> When I am implementing a reader, I could have a couple of choices:
> >>
> >>   - A thread-less reader that does not have any internal thread.
> >>   - When poll() is called, the same calling thread will perform a bunch
> of
> >>      IO asynchronously.
> >>      - When take() is called, the same calling thread will perform a
> bunch
> >>      of IO and wait until the record is ready.
> >>   - A reader with internal threads performing network IO and put records
> >>   into a buffer.
> >>      - When poll() is called, the calling thread simply reads from the
> >>      buffer and return empty result immediately if there is no record.
> >>      - When take() is called, the calling thread reads from the buffer
> and
> >>      block waiting if the buffer is empty.
> >>
> >> On the other hand, with the hasNext()/next()/isNextReady() API, it is
> less
> >> intuitive for the reader developers to write the thread-less pattern.
> >> Although technically speaking one can still do the asynchronous IO to
> >> prepare the record in isNextReady(). But it is inexplicit and seems
> >> somewhat hacky.
> >>
> >> Thanks,
> >>
> >> Jiangjie (Becket) Qin
> >>
> >>
> >>
> >> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <[hidden email]> wrote:
> >>
> >>> Couple more points regarding discovery:
> >>>
> >>> The proposal mentions that discovery could be outside the execution
> >> graph.
> >>> Today, discovered partitions/shards are checkpointed. I believe that
> will
> >>> also need to be the case in the future, even when discovery and reading
> >> are
> >>> split between different tasks.
> >>>
> >>> For cases such as resharding of a Kinesis stream, the relationship
> >> between
> >>> splits needs to be considered. Splits cannot be randomly distributed
> over
> >>> readers in certain situations. An example was mentioned here:
> >>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
> >>>
> >>> Thomas
> >>>
> >>>
> >>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[hidden email]> wrote:
> >>>
> >>>> Thanks for getting the ball rolling on this!
> >>>>
> >>>> Can the number of splits decrease? Yes, splits can be closed and go
> >> away.
> >>>> An example would be a shard merge in Kinesis (2 existing shards will
> be
> >>>> closed and replaced with a new shard).
> >>>>
> >>>> Regarding advance/poll/take: IMO the least restrictive approach would
> >> be
> >>>> the thread-less IO model (pull based, non-blocking, caller retrieves
> >> new
> >>>> records when available). The current Kinesis API requires the use of
> >>>> threads. But that can be internal to the split reader and does not
> need
> >>> to
> >>>> be a source API concern. In fact, that's what we are working on right
> >> now
> >>>> as improvement to the existing consumer: Each shard consumer thread
> >> will
> >>>> push to a queue, the consumer main thread will poll the queue(s). It
> is
> >>>> essentially a mapping from threaded IO to non-blocking.
> >>>>
> >>>> The proposed SplitReader interface would fit the thread-less IO model.
> >>>> Similar to an iterator, we find out if there is a new element
> (hasNext)
> >>> and
> >>>> if so, move to it (next()). Separate calls deliver the meta
> information
> >>>> (timestamp, watermark). Perhaps advance call could offer a timeout
> >>> option,
> >>>> so that the caller does not end up in a busy wait. On the other hand,
> a
> >>>> caller processing multiple splits may want to cycle through fast, to
> >>>> process elements of other splits as soon as they become available. The
> >>> nice
> >>>> thing is that this "split merge" logic can now live in Flink and be
> >>>> optimized and shared between different sources.
> >>>>
> >>>> Thanks,
> >>>> Thomas
> >>>>
> >>>>
> >>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]>
> wrote:
> >>>>
> >>>>> Hi,
> >>>>> Thanks Aljoscha for this FLIP.
> >>>>>
> >>>>> 1. I agree with Piotr and Becket that the non-blocking source is very
> >>>>> important. But in addition to `Future/poll`, there may be another way
> >> to
> >>>>> achieve this. I think it may be not very memory friendly if every
> >>> advance
> >>>>> call return a Future.
> >>>>>
> >>>>> public interface Listener {
> >>>>>     public void notify();
> >>>>> }
> >>>>>
> >>>>> public interface SplitReader() {
> >>>>>     /**
> >>>>>      * When there is no element temporarily, this will return false.
> >>>>>      * When elements is available again splitReader can call
> >>>>> listener.notify()
> >>>>>      * In addition the frame would check `advance` periodically .
> >>>>>      * Of course advance can always return true and ignore the
> >> listener
> >>>>> argument for simplicity.
> >>>>>      */
> >>>>>     public boolean advance(Listener listener);
> >>>>> }
> >>>>>
> >>>>> 2.  The FLIP tells us very clearly that how to create all Splits and
> >> how
> >>>>> to create a SplitReader from a Split. But there is no strategy for
> the
> >>> user
> >>>>> to choose how to assign the splits to the tasks. I think we could add
> >> a
> >>>>> Enum to let user to choose.
> >>>>> /**
> >>>>>  public Enum SplitsAssignmentPolicy {
> >>>>>    Location,
> >>>>>    Workload,
> >>>>>    Random,
> >>>>>    Average
> >>>>>  }
> >>>>> */
> >>>>>
> >>>>> 3. If merge the `advance` and `getCurrent`  to one method like
> >> `getNext`
> >>>>> the `getNext` would need return a `ElementWithTimestamp` because some
> >>>>> sources want to add timestamp to every element. IMO, this is not so
> >>> memory
> >>>>> friendly so I prefer this design.
> >>>>>
> >>>>>
> >>>>>
> >>>>> Thanks
> >>>>>
> >>>>> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
> >>>>>
> >>>>>> Hi,
> >>>>>>
> >>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of
> other
> >>>>>> possible improvements. I have one proposal. Instead of having a
> >> method:
> >>>>>>
> >>>>>> boolean advance() throws IOException;
> >>>>>>
> >>>>>> I would replace it with
> >>>>>>
> >>>>>> /*
> >>>>>> * Return a future, which when completed means that source has more
> >>> data
> >>>>>> and getNext() will not block.
> >>>>>> * If you wish to use benefits of non blocking connectors, please
> >>>>>> implement this method appropriately.
> >>>>>> */
> >>>>>> default CompletableFuture<?> isBlocked() {
> >>>>>>        return CompletableFuture.completedFuture(null);
> >>>>>> }
> >>>>>>
> >>>>>> And rename `getCurrent()` to `getNext()`.
> >>>>>>
> >>>>>> Couple of arguments:
> >>>>>> 1. I don’t understand the division of work between `advance()` and
> >>>>>> `getCurrent()`. What should be done in which, especially for
> >> connectors
> >>>>>> that handle records in batches (like Kafka) and when should you call
> >>>>>> `advance` and when `getCurrent()`.
> >>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in
> >> the
> >>>>>> future to have asynchronous/non blocking connectors and more
> >>> efficiently
> >>>>>> handle large number of blocked threads, without busy waiting. While
> >> at
> >>> the
> >>>>>> same time it doesn’t add much complexity, since naive connector
> >>>>>> implementations can be always blocking.
> >>>>>> 3. This also would allow us to use a fixed size thread pool of task
> >>>>>> executors, instead of one thread per task.
> >>>>>>
> >>>>>> Piotrek
> >>>>>>
> >>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]>
> >>>>>> wrote:
> >>>>>>>
> >>>>>>> Hi All,
> >>>>>>>
> >>>>>>> In order to finally get the ball rolling on the new source
> >> interface
> >>>>>> that we have discussed for so long I finally created a FLIP:
> >>>>>>
> >>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >>>>>>>
> >>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
> >> about
> >>>>>> adding per-partition watermark support to the Kinesis source and
> >>> because
> >>>>>> this would enable generic implementation of event-time alignment for
> >>> all
> >>>>>> sources. Maybe we need another FLIP for the event-time alignment
> >> part,
> >>>>>> especially the part about information sharing between operations
> (I'm
> >>> not
> >>>>>> calling it state sharing because state has a special meaning in
> >> Flink).
> >>>>>>>
> >>>>>>> Please discuss away!
> >>>>>>>
> >>>>>>> Aljoscha
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Biao Liu
Regarding the naming style.

The advantage of `poll()` style is that basically the name of `poll` means
it should be a non-blocking operator, same with `Queue` in Java API. It's
easy to understand. We don't need to write too much in docs to imply the
implementation should not do something heavy.
However `poll` also means it should return the thing we want. In our
scenario, there are 3 types currently, record, timestamp and watermark. So
the return type of `poll` should be tuple3 or something like that. It looks
a little hacky IMO.

The `advance()` style is more like RecordReader
<https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/RecordReader.html>
of
MapReduce, or ISpout
<https://storm.apache.org/releases/1.1.2/javadocs/org/apache/storm/spout/ISpout.html>
of
Storm. It means moving the offset forward indeed. It makes sense to me.
To be honest I like `advance()` style more.

And there is also another small point I can't get.

Why use `start()` and `close()` in `SplitReader`? `start()` makes me think
of "starting a thread" or something like that. We should not assume there
would be some thread. I prefer `open()`, it also matches the `close()`
better.


Becket Qin <[hidden email]> 于2018年11月6日周二 上午11:04写道:

> Thanks for updating the wiki, Aljoscha.
>
> The isDone()/advance()/getCurrent() API looks more similar to
> hasNext()/isNextReady()/getNext(), but implying some different behaviors.
>
> If users call getCurrent() twice without calling advance() in between, will
> they get the same record back? From the API itself, users might think
> advance() is the API that moves the offset forward, and getCurrent() just
> return the record at the current offset.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Mon, Nov 5, 2018 at 10:41 PM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > I updated the FLIP [1] with some Javadoc for the SplitReader to outline
> > what I had in mind with the interface. Sorry for not doing that earlier,
> > it's not quite clear how the methods should work from the name alone.
> >
> > The gist of it is that advance() should be non-blocking, so
> > isDone/advance()/getCurrent() are very similar to isDone()/poll()/take()
> > that I have seen mentioned.
> >
> > [1]
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > <
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> > >
> >
> > > On 5. Nov 2018, at 11:05, Biao Liu <[hidden email]> wrote:
> > >
> > > Thanks Aljoscha for bringing us this discussion!
> > >
> > > 1. I think one of the reason about separating `advance()` and
> > > `getCurrent()` is that we have several different types returned by
> > source.
> > > Not just the `record`, but also the timestamp of record and the
> > watermark.
> > > If we don't separate these into different methods, the source has to
> > return
> > > a tuple3 which is not so user friendly. The prototype of Aljoscha is
> > > acceptable to me. Regarding the specific method name, I'm not sure
> which
> > > one is better. Both of them are reasonable for me.
> > >
> > > 2. As Thomas and Becket mentioned before, I think a non-blocking API is
> > > necessary. Moreover, IMO we should not offer a blocking API. It doesn't
> > > help but makes things more complicated.
> > >
> > > 3. About the thread model.
> > > I agree with Thomas about the thread-less IO model. A standard workflow
> > > should look like below.
> > >  - If there is available data, Flink would read it.
> > >  - If there is no data available temporary, Flink would check again a
> > > moment later. Maybe waiting on a semaphore until a timer wake it up.
> > > Furthermore, we can offer an optional optimization for source which has
> > > external thread. Like Guowei mentioned, there can be a listener which
> the
> > > reader can wake the framework up as soon as new data comes. This can
> > solve
> > > Piotr's concern about efficiency.
> > >
> > > 4. One more thing. After taking a look at the prototype codes. Off the
> > top
> > > of my head, the implementation is more fit for batch job not streaming
> > job.
> > > There are two types of tasks in prototype. First is a source task that
> > > discovers the splits. The source passes the splits to the second task
> > which
> > > process the splits one by one. And then the source keeps watch to
> > discover
> > > more splits.
> > >
> > > However, I think the more common scenario of streaming job is:
> > > there are fixed splits, each of the subtasks takes several splits. The
> > > subtasks just keep processing the fixed splits. There would be
> continuous
> > > datum in each split. We don't need a source task to discover more
> splits.
> > > It can not be finished in streaming job since we don't want the
> > processing
> > > task finished even there are no more splits.
> > >
> > > So IMO we should offer another source operator for the new interface.
> It
> > > would discover all splits when it is opening. Then picks the splits
> > belong
> > > to this subtask. Keep processing these splits until all of them are
> > > finished.
> > >
> > >
> > > Becket Qin <[hidden email]> 于2018年11月5日周一 上午11:00写道:
> > >
> > >> Hi Thomas,
> > >>
> > >> The iterator-like API was also the first thing that came to me. But it
> > >> seems a little confusing that hasNext() does not mean "the stream has
> > not
> > >> ended", but means "the next record is ready", which is repurposing the
> > well
> > >> known meaning of hasNext(). If we follow the hasNext()/next() pattern,
> > an
> > >> additional isNextReady() method to indicate whether the next record is
> > >> ready seems more intuitive to me.
> > >>
> > >> Similarly, in poll()/take() pattern, another method of isDone() is
> > needed
> > >> to indicate whether the stream has ended or not.
> > >>
> > >> Compared with hasNext()/next()/isNextReady() pattern,
> > >> isDone()/poll()/take() seems more flexible for the reader
> > implementation.
> > >> When I am implementing a reader, I could have a couple of choices:
> > >>
> > >>   - A thread-less reader that does not have any internal thread.
> > >>   - When poll() is called, the same calling thread will perform a
> bunch
> > of
> > >>      IO asynchronously.
> > >>      - When take() is called, the same calling thread will perform a
> > bunch
> > >>      of IO and wait until the record is ready.
> > >>   - A reader with internal threads performing network IO and put
> records
> > >>   into a buffer.
> > >>      - When poll() is called, the calling thread simply reads from the
> > >>      buffer and return empty result immediately if there is no record.
> > >>      - When take() is called, the calling thread reads from the buffer
> > and
> > >>      block waiting if the buffer is empty.
> > >>
> > >> On the other hand, with the hasNext()/next()/isNextReady() API, it is
> > less
> > >> intuitive for the reader developers to write the thread-less pattern.
> > >> Although technically speaking one can still do the asynchronous IO to
> > >> prepare the record in isNextReady(). But it is inexplicit and seems
> > >> somewhat hacky.
> > >>
> > >> Thanks,
> > >>
> > >> Jiangjie (Becket) Qin
> > >>
> > >>
> > >>
> > >> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <[hidden email]> wrote:
> > >>
> > >>> Couple more points regarding discovery:
> > >>>
> > >>> The proposal mentions that discovery could be outside the execution
> > >> graph.
> > >>> Today, discovered partitions/shards are checkpointed. I believe that
> > will
> > >>> also need to be the case in the future, even when discovery and
> reading
> > >> are
> > >>> split between different tasks.
> > >>>
> > >>> For cases such as resharding of a Kinesis stream, the relationship
> > >> between
> > >>> splits needs to be considered. Splits cannot be randomly distributed
> > over
> > >>> readers in certain situations. An example was mentioned here:
> > >>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
> > >>>
> > >>> Thomas
> > >>>
> > >>>
> > >>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[hidden email]> wrote:
> > >>>
> > >>>> Thanks for getting the ball rolling on this!
> > >>>>
> > >>>> Can the number of splits decrease? Yes, splits can be closed and go
> > >> away.
> > >>>> An example would be a shard merge in Kinesis (2 existing shards will
> > be
> > >>>> closed and replaced with a new shard).
> > >>>>
> > >>>> Regarding advance/poll/take: IMO the least restrictive approach
> would
> > >> be
> > >>>> the thread-less IO model (pull based, non-blocking, caller retrieves
> > >> new
> > >>>> records when available). The current Kinesis API requires the use of
> > >>>> threads. But that can be internal to the split reader and does not
> > need
> > >>> to
> > >>>> be a source API concern. In fact, that's what we are working on
> right
> > >> now
> > >>>> as improvement to the existing consumer: Each shard consumer thread
> > >> will
> > >>>> push to a queue, the consumer main thread will poll the queue(s). It
> > is
> > >>>> essentially a mapping from threaded IO to non-blocking.
> > >>>>
> > >>>> The proposed SplitReader interface would fit the thread-less IO
> model.
> > >>>> Similar to an iterator, we find out if there is a new element
> > (hasNext)
> > >>> and
> > >>>> if so, move to it (next()). Separate calls deliver the meta
> > information
> > >>>> (timestamp, watermark). Perhaps advance call could offer a timeout
> > >>> option,
> > >>>> so that the caller does not end up in a busy wait. On the other
> hand,
> > a
> > >>>> caller processing multiple splits may want to cycle through fast, to
> > >>>> process elements of other splits as soon as they become available.
> The
> > >>> nice
> > >>>> thing is that this "split merge" logic can now live in Flink and be
> > >>>> optimized and shared between different sources.
> > >>>>
> > >>>> Thanks,
> > >>>> Thomas
> > >>>>
> > >>>>
> > >>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]>
> > wrote:
> > >>>>
> > >>>>> Hi,
> > >>>>> Thanks Aljoscha for this FLIP.
> > >>>>>
> > >>>>> 1. I agree with Piotr and Becket that the non-blocking source is
> very
> > >>>>> important. But in addition to `Future/poll`, there may be another
> way
> > >> to
> > >>>>> achieve this. I think it may be not very memory friendly if every
> > >>> advance
> > >>>>> call return a Future.
> > >>>>>
> > >>>>> public interface Listener {
> > >>>>>     public void notify();
> > >>>>> }
> > >>>>>
> > >>>>> public interface SplitReader() {
> > >>>>>     /**
> > >>>>>      * When there is no element temporarily, this will return
> false.
> > >>>>>      * When elements is available again splitReader can call
> > >>>>> listener.notify()
> > >>>>>      * In addition the frame would check `advance` periodically .
> > >>>>>      * Of course advance can always return true and ignore the
> > >> listener
> > >>>>> argument for simplicity.
> > >>>>>      */
> > >>>>>     public boolean advance(Listener listener);
> > >>>>> }
> > >>>>>
> > >>>>> 2.  The FLIP tells us very clearly that how to create all Splits
> and
> > >> how
> > >>>>> to create a SplitReader from a Split. But there is no strategy for
> > the
> > >>> user
> > >>>>> to choose how to assign the splits to the tasks. I think we could
> add
> > >> a
> > >>>>> Enum to let user to choose.
> > >>>>> /**
> > >>>>>  public Enum SplitsAssignmentPolicy {
> > >>>>>    Location,
> > >>>>>    Workload,
> > >>>>>    Random,
> > >>>>>    Average
> > >>>>>  }
> > >>>>> */
> > >>>>>
> > >>>>> 3. If merge the `advance` and `getCurrent`  to one method like
> > >> `getNext`
> > >>>>> the `getNext` would need return a `ElementWithTimestamp` because
> some
> > >>>>> sources want to add timestamp to every element. IMO, this is not so
> > >>> memory
> > >>>>> friendly so I prefer this design.
> > >>>>>
> > >>>>>
> > >>>>>
> > >>>>> Thanks
> > >>>>>
> > >>>>> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
> > >>>>>
> > >>>>>> Hi,
> > >>>>>>
> > >>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of
> > other
> > >>>>>> possible improvements. I have one proposal. Instead of having a
> > >> method:
> > >>>>>>
> > >>>>>> boolean advance() throws IOException;
> > >>>>>>
> > >>>>>> I would replace it with
> > >>>>>>
> > >>>>>> /*
> > >>>>>> * Return a future, which when completed means that source has more
> > >>> data
> > >>>>>> and getNext() will not block.
> > >>>>>> * If you wish to use benefits of non blocking connectors, please
> > >>>>>> implement this method appropriately.
> > >>>>>> */
> > >>>>>> default CompletableFuture<?> isBlocked() {
> > >>>>>>        return CompletableFuture.completedFuture(null);
> > >>>>>> }
> > >>>>>>
> > >>>>>> And rename `getCurrent()` to `getNext()`.
> > >>>>>>
> > >>>>>> Couple of arguments:
> > >>>>>> 1. I don’t understand the division of work between `advance()` and
> > >>>>>> `getCurrent()`. What should be done in which, especially for
> > >> connectors
> > >>>>>> that handle records in batches (like Kafka) and when should you
> call
> > >>>>>> `advance` and when `getCurrent()`.
> > >>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us
> in
> > >> the
> > >>>>>> future to have asynchronous/non blocking connectors and more
> > >>> efficiently
> > >>>>>> handle large number of blocked threads, without busy waiting.
> While
> > >> at
> > >>> the
> > >>>>>> same time it doesn’t add much complexity, since naive connector
> > >>>>>> implementations can be always blocking.
> > >>>>>> 3. This also would allow us to use a fixed size thread pool of
> task
> > >>>>>> executors, instead of one thread per task.
> > >>>>>>
> > >>>>>> Piotrek
> > >>>>>>
> > >>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]>
> > >>>>>> wrote:
> > >>>>>>>
> > >>>>>>> Hi All,
> > >>>>>>>
> > >>>>>>> In order to finally get the ball rolling on the new source
> > >> interface
> > >>>>>> that we have discussed for so long I finally created a FLIP:
> > >>>>>>
> > >>>
> > >>
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > >>>>>>>
> > >>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
> > >> about
> > >>>>>> adding per-partition watermark support to the Kinesis source and
> > >>> because
> > >>>>>> this would enable generic implementation of event-time alignment
> for
> > >>> all
> > >>>>>> sources. Maybe we need another FLIP for the event-time alignment
> > >> part,
> > >>>>>> especially the part about information sharing between operations
> > (I'm
> > >>> not
> > >>>>>> calling it state sharing because state has a special meaning in
> > >> Flink).
> > >>>>>>>
> > >>>>>>> Please discuss away!
> > >>>>>>>
> > >>>>>>> Aljoscha
> > >>>>>>>
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>
> > >>
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Becket Qin
Hi Biao,

Thanks for the explanation. The current API makes more sense to me now. It
basically means:
1. Readers should all be non-blocking
2. The offset advancing and the record fetching are two steps.
3. After each advance() call, the currentRecord, currentTimestamp and
watermark will all be updated at the same time. And those values can be
accessed multiple times.

That being said, with the poll()/take() method, we don't have to return
tuple3. poll()/take() will just return the record. It means:
1. Readers could be blocking (take()) or non-blocking(poll())
2. The offset advancing and the record fetching are combined into one step,
i.e. poll()/take()
3. After each poll()/take(), the currentTimestamp and watermark are
updated. That means after poll()/take(), users can call
getCurrentTimestamp() or getWatermark() to get the information at the point
after the previous record was returned.

One concern I have for the completely non-blocking reader is that it would
be difficult to implement a blocking behavior on top of the thread-less
non-blocking reader. If advance() returns false, since the reader is
thread-less, unless the caller thread call something on the reader, no
progress will be made. Hence the caller has to call advance() again to
check, either with a backoff (introducing some latency) or a tight loop.
But neither of them is ideal. From this perspective, I think it is useful
to have a blocking() API in the reader, so the blocking behavior could be
done efficiently, e.g. by using a NIO selector which relies on the OS
signals.

WRT to the SplitEnumerator, I still feel that it would be better for the
SplitEnumerator to not only return new splits but all the splits to cover
the splits shrink case. Also, it took me a while to understand why
*createInitialEnumeratorCheckpoint()
*is needed. I am wondering whether it would be better to replace it with a
parameter-less *createSplitEnumerator(). *

Thanks,

Jiangjie (Becket) Qin



On Tue, Nov 6, 2018 at 11:40 PM Biao Liu <[hidden email]> wrote:

> Regarding the naming style.
>
> The advantage of `poll()` style is that basically the name of `poll` means
> it should be a non-blocking operator, same with `Queue` in Java API. It's
> easy to understand. We don't need to write too much in docs to imply the
> implementation should not do something heavy.
> However `poll` also means it should return the thing we want. In our
> scenario, there are 3 types currently, record, timestamp and watermark. So
> the return type of `poll` should be tuple3 or something like that. It looks
> a little hacky IMO.
>
> The `advance()` style is more like RecordReader
> <
> https://hadoop.apache.org/docs/current/api/org/apache/hadoop/mapreduce/RecordReader.html
> >
> of
> MapReduce, or ISpout
> <
> https://storm.apache.org/releases/1.1.2/javadocs/org/apache/storm/spout/ISpout.html
> >
> of
> Storm. It means moving the offset forward indeed. It makes sense to me.
> To be honest I like `advance()` style more.
>
> And there is also another small point I can't get.
>
> Why use `start()` and `close()` in `SplitReader`? `start()` makes me think
> of "starting a thread" or something like that. We should not assume there
> would be some thread. I prefer `open()`, it also matches the `close()`
> better.
>
>
> Becket Qin <[hidden email]> 于2018年11月6日周二 上午11:04写道:
>
> > Thanks for updating the wiki, Aljoscha.
> >
> > The isDone()/advance()/getCurrent() API looks more similar to
> > hasNext()/isNextReady()/getNext(), but implying some different behaviors.
> >
> > If users call getCurrent() twice without calling advance() in between,
> will
> > they get the same record back? From the API itself, users might think
> > advance() is the API that moves the offset forward, and getCurrent() just
> > return the record at the current offset.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Mon, Nov 5, 2018 at 10:41 PM Aljoscha Krettek <[hidden email]>
> > wrote:
> >
> > > I updated the FLIP [1] with some Javadoc for the SplitReader to outline
> > > what I had in mind with the interface. Sorry for not doing that
> earlier,
> > > it's not quite clear how the methods should work from the name alone.
> > >
> > > The gist of it is that advance() should be non-blocking, so
> > > isDone/advance()/getCurrent() are very similar to
> isDone()/poll()/take()
> > > that I have seen mentioned.
> > >
> > > [1]
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > > <
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27:+Refactor+Source+Interface
> > > >
> > >
> > > > On 5. Nov 2018, at 11:05, Biao Liu <[hidden email]> wrote:
> > > >
> > > > Thanks Aljoscha for bringing us this discussion!
> > > >
> > > > 1. I think one of the reason about separating `advance()` and
> > > > `getCurrent()` is that we have several different types returned by
> > > source.
> > > > Not just the `record`, but also the timestamp of record and the
> > > watermark.
> > > > If we don't separate these into different methods, the source has to
> > > return
> > > > a tuple3 which is not so user friendly. The prototype of Aljoscha is
> > > > acceptable to me. Regarding the specific method name, I'm not sure
> > which
> > > > one is better. Both of them are reasonable for me.
> > > >
> > > > 2. As Thomas and Becket mentioned before, I think a non-blocking API
> is
> > > > necessary. Moreover, IMO we should not offer a blocking API. It
> doesn't
> > > > help but makes things more complicated.
> > > >
> > > > 3. About the thread model.
> > > > I agree with Thomas about the thread-less IO model. A standard
> workflow
> > > > should look like below.
> > > >  - If there is available data, Flink would read it.
> > > >  - If there is no data available temporary, Flink would check again a
> > > > moment later. Maybe waiting on a semaphore until a timer wake it up.
> > > > Furthermore, we can offer an optional optimization for source which
> has
> > > > external thread. Like Guowei mentioned, there can be a listener which
> > the
> > > > reader can wake the framework up as soon as new data comes. This can
> > > solve
> > > > Piotr's concern about efficiency.
> > > >
> > > > 4. One more thing. After taking a look at the prototype codes. Off
> the
> > > top
> > > > of my head, the implementation is more fit for batch job not
> streaming
> > > job.
> > > > There are two types of tasks in prototype. First is a source task
> that
> > > > discovers the splits. The source passes the splits to the second task
> > > which
> > > > process the splits one by one. And then the source keeps watch to
> > > discover
> > > > more splits.
> > > >
> > > > However, I think the more common scenario of streaming job is:
> > > > there are fixed splits, each of the subtasks takes several splits.
> The
> > > > subtasks just keep processing the fixed splits. There would be
> > continuous
> > > > datum in each split. We don't need a source task to discover more
> > splits.
> > > > It can not be finished in streaming job since we don't want the
> > > processing
> > > > task finished even there are no more splits.
> > > >
> > > > So IMO we should offer another source operator for the new interface.
> > It
> > > > would discover all splits when it is opening. Then picks the splits
> > > belong
> > > > to this subtask. Keep processing these splits until all of them are
> > > > finished.
> > > >
> > > >
> > > > Becket Qin <[hidden email]> 于2018年11月5日周一 上午11:00写道:
> > > >
> > > >> Hi Thomas,
> > > >>
> > > >> The iterator-like API was also the first thing that came to me. But
> it
> > > >> seems a little confusing that hasNext() does not mean "the stream
> has
> > > not
> > > >> ended", but means "the next record is ready", which is repurposing
> the
> > > well
> > > >> known meaning of hasNext(). If we follow the hasNext()/next()
> pattern,
> > > an
> > > >> additional isNextReady() method to indicate whether the next record
> is
> > > >> ready seems more intuitive to me.
> > > >>
> > > >> Similarly, in poll()/take() pattern, another method of isDone() is
> > > needed
> > > >> to indicate whether the stream has ended or not.
> > > >>
> > > >> Compared with hasNext()/next()/isNextReady() pattern,
> > > >> isDone()/poll()/take() seems more flexible for the reader
> > > implementation.
> > > >> When I am implementing a reader, I could have a couple of choices:
> > > >>
> > > >>   - A thread-less reader that does not have any internal thread.
> > > >>   - When poll() is called, the same calling thread will perform a
> > bunch
> > > of
> > > >>      IO asynchronously.
> > > >>      - When take() is called, the same calling thread will perform a
> > > bunch
> > > >>      of IO and wait until the record is ready.
> > > >>   - A reader with internal threads performing network IO and put
> > records
> > > >>   into a buffer.
> > > >>      - When poll() is called, the calling thread simply reads from
> the
> > > >>      buffer and return empty result immediately if there is no
> record.
> > > >>      - When take() is called, the calling thread reads from the
> buffer
> > > and
> > > >>      block waiting if the buffer is empty.
> > > >>
> > > >> On the other hand, with the hasNext()/next()/isNextReady() API, it
> is
> > > less
> > > >> intuitive for the reader developers to write the thread-less
> pattern.
> > > >> Although technically speaking one can still do the asynchronous IO
> to
> > > >> prepare the record in isNextReady(). But it is inexplicit and seems
> > > >> somewhat hacky.
> > > >>
> > > >> Thanks,
> > > >>
> > > >> Jiangjie (Becket) Qin
> > > >>
> > > >>
> > > >>
> > > >> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <[hidden email]> wrote:
> > > >>
> > > >>> Couple more points regarding discovery:
> > > >>>
> > > >>> The proposal mentions that discovery could be outside the execution
> > > >> graph.
> > > >>> Today, discovered partitions/shards are checkpointed. I believe
> that
> > > will
> > > >>> also need to be the case in the future, even when discovery and
> > reading
> > > >> are
> > > >>> split between different tasks.
> > > >>>
> > > >>> For cases such as resharding of a Kinesis stream, the relationship
> > > >> between
> > > >>> splits needs to be considered. Splits cannot be randomly
> distributed
> > > over
> > > >>> readers in certain situations. An example was mentioned here:
> > > >>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
> > > >>>
> > > >>> Thomas
> > > >>>
> > > >>>
> > > >>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[hidden email]>
> wrote:
> > > >>>
> > > >>>> Thanks for getting the ball rolling on this!
> > > >>>>
> > > >>>> Can the number of splits decrease? Yes, splits can be closed and
> go
> > > >> away.
> > > >>>> An example would be a shard merge in Kinesis (2 existing shards
> will
> > > be
> > > >>>> closed and replaced with a new shard).
> > > >>>>
> > > >>>> Regarding advance/poll/take: IMO the least restrictive approach
> > would
> > > >> be
> > > >>>> the thread-less IO model (pull based, non-blocking, caller
> retrieves
> > > >> new
> > > >>>> records when available). The current Kinesis API requires the use
> of
> > > >>>> threads. But that can be internal to the split reader and does not
> > > need
> > > >>> to
> > > >>>> be a source API concern. In fact, that's what we are working on
> > right
> > > >> now
> > > >>>> as improvement to the existing consumer: Each shard consumer
> thread
> > > >> will
> > > >>>> push to a queue, the consumer main thread will poll the queue(s).
> It
> > > is
> > > >>>> essentially a mapping from threaded IO to non-blocking.
> > > >>>>
> > > >>>> The proposed SplitReader interface would fit the thread-less IO
> > model.
> > > >>>> Similar to an iterator, we find out if there is a new element
> > > (hasNext)
> > > >>> and
> > > >>>> if so, move to it (next()). Separate calls deliver the meta
> > > information
> > > >>>> (timestamp, watermark). Perhaps advance call could offer a timeout
> > > >>> option,
> > > >>>> so that the caller does not end up in a busy wait. On the other
> > hand,
> > > a
> > > >>>> caller processing multiple splits may want to cycle through fast,
> to
> > > >>>> process elements of other splits as soon as they become available.
> > The
> > > >>> nice
> > > >>>> thing is that this "split merge" logic can now live in Flink and
> be
> > > >>>> optimized and shared between different sources.
> > > >>>>
> > > >>>> Thanks,
> > > >>>> Thomas
> > > >>>>
> > > >>>>
> > > >>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]>
> > > wrote:
> > > >>>>
> > > >>>>> Hi,
> > > >>>>> Thanks Aljoscha for this FLIP.
> > > >>>>>
> > > >>>>> 1. I agree with Piotr and Becket that the non-blocking source is
> > very
> > > >>>>> important. But in addition to `Future/poll`, there may be another
> > way
> > > >> to
> > > >>>>> achieve this. I think it may be not very memory friendly if every
> > > >>> advance
> > > >>>>> call return a Future.
> > > >>>>>
> > > >>>>> public interface Listener {
> > > >>>>>     public void notify();
> > > >>>>> }
> > > >>>>>
> > > >>>>> public interface SplitReader() {
> > > >>>>>     /**
> > > >>>>>      * When there is no element temporarily, this will return
> > false.
> > > >>>>>      * When elements is available again splitReader can call
> > > >>>>> listener.notify()
> > > >>>>>      * In addition the frame would check `advance` periodically .
> > > >>>>>      * Of course advance can always return true and ignore the
> > > >> listener
> > > >>>>> argument for simplicity.
> > > >>>>>      */
> > > >>>>>     public boolean advance(Listener listener);
> > > >>>>> }
> > > >>>>>
> > > >>>>> 2.  The FLIP tells us very clearly that how to create all Splits
> > and
> > > >> how
> > > >>>>> to create a SplitReader from a Split. But there is no strategy
> for
> > > the
> > > >>> user
> > > >>>>> to choose how to assign the splits to the tasks. I think we could
> > add
> > > >> a
> > > >>>>> Enum to let user to choose.
> > > >>>>> /**
> > > >>>>>  public Enum SplitsAssignmentPolicy {
> > > >>>>>    Location,
> > > >>>>>    Workload,
> > > >>>>>    Random,
> > > >>>>>    Average
> > > >>>>>  }
> > > >>>>> */
> > > >>>>>
> > > >>>>> 3. If merge the `advance` and `getCurrent`  to one method like
> > > >> `getNext`
> > > >>>>> the `getNext` would need return a `ElementWithTimestamp` because
> > some
> > > >>>>> sources want to add timestamp to every element. IMO, this is not
> so
> > > >>> memory
> > > >>>>> friendly so I prefer this design.
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> Thanks
> > > >>>>>
> > > >>>>> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
> > > >>>>>
> > > >>>>>> Hi,
> > > >>>>>>
> > > >>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of
> > > other
> > > >>>>>> possible improvements. I have one proposal. Instead of having a
> > > >> method:
> > > >>>>>>
> > > >>>>>> boolean advance() throws IOException;
> > > >>>>>>
> > > >>>>>> I would replace it with
> > > >>>>>>
> > > >>>>>> /*
> > > >>>>>> * Return a future, which when completed means that source has
> more
> > > >>> data
> > > >>>>>> and getNext() will not block.
> > > >>>>>> * If you wish to use benefits of non blocking connectors, please
> > > >>>>>> implement this method appropriately.
> > > >>>>>> */
> > > >>>>>> default CompletableFuture<?> isBlocked() {
> > > >>>>>>        return CompletableFuture.completedFuture(null);
> > > >>>>>> }
> > > >>>>>>
> > > >>>>>> And rename `getCurrent()` to `getNext()`.
> > > >>>>>>
> > > >>>>>> Couple of arguments:
> > > >>>>>> 1. I don’t understand the division of work between `advance()`
> and
> > > >>>>>> `getCurrent()`. What should be done in which, especially for
> > > >> connectors
> > > >>>>>> that handle records in batches (like Kafka) and when should you
> > call
> > > >>>>>> `advance` and when `getCurrent()`.
> > > >>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us
> > in
> > > >> the
> > > >>>>>> future to have asynchronous/non blocking connectors and more
> > > >>> efficiently
> > > >>>>>> handle large number of blocked threads, without busy waiting.
> > While
> > > >> at
> > > >>> the
> > > >>>>>> same time it doesn’t add much complexity, since naive connector
> > > >>>>>> implementations can be always blocking.
> > > >>>>>> 3. This also would allow us to use a fixed size thread pool of
> > task
> > > >>>>>> executors, instead of one thread per task.
> > > >>>>>>
> > > >>>>>> Piotrek
> > > >>>>>>
> > > >>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <
> [hidden email]>
> > > >>>>>> wrote:
> > > >>>>>>>
> > > >>>>>>> Hi All,
> > > >>>>>>>
> > > >>>>>>> In order to finally get the ball rolling on the new source
> > > >> interface
> > > >>>>>> that we have discussed for so long I finally created a FLIP:
> > > >>>>>>
> > > >>>
> > > >>
> > >
> >
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> > > >>>>>>>
> > > >>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
> > > >> about
> > > >>>>>> adding per-partition watermark support to the Kinesis source and
> > > >>> because
> > > >>>>>> this would enable generic implementation of event-time alignment
> > for
> > > >>> all
> > > >>>>>> sources. Maybe we need another FLIP for the event-time alignment
> > > >> part,
> > > >>>>>> especially the part about information sharing between operations
> > > (I'm
> > > >>> not
> > > >>>>>> calling it state sharing because state has a special meaning in
> > > >> Flink).
> > > >>>>>>>
> > > >>>>>>> Please discuss away!
> > > >>>>>>>
> > > >>>>>>> Aljoscha
> > > >>>>>>>
> > > >>>>>>>
> > > >>>>>>
> > > >>>>>>
> > > >>>
> > > >>
> > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Piotr Nowojski
In reply to this post by Becket Qin
Hi,

a)

> BTW, regarding the isBlock() method, I have a few more questions. 21, Is a method isReady() with boolean as a return value
> equivalent? Personally I found it is a little bit confusing in what is supposed to be returned when the future is completed. 22. if
> the implementation of isBlocked() is optional, how do the callers know whether the method is properly implemented or not?
> Does not implemented mean it always return a completed future?

`CompletableFuture<?> isBlocked()` is more or less an equivalent to `boolean hasNext()` which in case of “false” provides some kind of a listener/callback that notifies about presence of next element. There are some minor details, like `CompletableFuture<?>` has a minimal two state logic:

1. Future is completed - we have more data
2. Future not yet completed - we don’t have data now, but we might/we will have in the future

While `boolean hasNext()` and `notify()` callback are a bit more complicated/dispersed and can lead/encourage `notify()` spam.

b)

> 3. If merge the `advance` and `getCurrent`  to one method like `getNext` the `getNext` would need return a
>`ElementWithTimestamp` because some sources want to add timestamp to every element. IMO, this is not so memory friendly
> so I prefer this design.

Guowei I don’t quite understand this. Could you elaborate why having a separate `advance()` help?

c)

Regarding advance/poll/take. What’s the value of having two separate methods: poll and take? Which one of them should be called and which implemented? What’s the benefit of having those methods compared to having a one single method `getNextElement()` (or `pollElement() or whatever we name it) with following contract:

CompletableFuture<?> isBlocked();

/**
Return next element - will be called only if `isBlocked()` is completed. Try to implement it in non blocking fashion, but if that’s impossible or you just don’t need the effort, you can block in this method.
*/
T getNextElement();

I mean, if the connector is implemented non-blockingly, Flink should use it that way. If it’s not, then `poll()` will `throw new NotImplementedException()`. Implementing both of them and providing both of them to Flink wouldn’t make a sense, thus why not merge them into a single method call that should preferably (but not necessarily need to) be non-blocking? It’s not like we are implementing general purpose `Queue`, which users might want to call either of `poll` or `take`. We would always prefer to call `poll`, but if it’s blocking, then still we have no choice, but to call it and block on it.

d)

> 1. I agree with Piotr and Becket that the non-blocking source is very
> important. But in addition to `Future/poll`, there may be another way to
> achieve this. I think it may be not very memory friendly if every advance
> call return a Future.

I didn’t want to mention this, to not clog my initial proposal, but there is a simple solution for the problem:

public interface SplitReader {
   
    (…)

    CompletableFuture<?> NOT_BLOCKED = CompletableFuture.completedFuture(null);

    /**
     * Returns a future that will be completed when the page source becomes
     * unblocked.  If the page source is not blocked, this method should return
     * {@code NOT_BLOCKED}.
     */
    default CompletableFuture<?> isBlocked()
    {
        return NOT_BLOCKED;
    }

If we are blocked and we are waiting for the IO, then creating a new Future is non-issue. Under full throttle/throughput and not blocked sources returning a static `NOT_BLOCKED` constant  should also solve the problem.

One more remark, non-blocking sources might be a necessity in a single threaded model without a checkpointing lock. (Currently when sources are blocked, they can release checkpointing lock and re-acquire it again later). Non-blocking `poll`/`getNext()` would allow for checkpoints to happen when source is idling. In that case either `notify()` or my proposed `isBlocked()` would allow to avoid busy-looping.

Piotrek

> On 5 Nov 2018, at 03:59, Becket Qin <[hidden email]> wrote:
>
> Hi Thomas,
>
> The iterator-like API was also the first thing that came to me. But it
> seems a little confusing that hasNext() does not mean "the stream has not
> ended", but means "the next record is ready", which is repurposing the well
> known meaning of hasNext(). If we follow the hasNext()/next() pattern, an
> additional isNextReady() method to indicate whether the next record is
> ready seems more intuitive to me.
>
> Similarly, in poll()/take() pattern, another method of isDone() is needed
> to indicate whether the stream has ended or not.
>
> Compared with hasNext()/next()/isNextReady() pattern,
> isDone()/poll()/take() seems more flexible for the reader implementation.
> When I am implementing a reader, I could have a couple of choices:
>
>   - A thread-less reader that does not have any internal thread.
>   - When poll() is called, the same calling thread will perform a bunch of
>      IO asynchronously.
>      - When take() is called, the same calling thread will perform a bunch
>      of IO and wait until the record is ready.
>   - A reader with internal threads performing network IO and put records
>   into a buffer.
>      - When poll() is called, the calling thread simply reads from the
>      buffer and return empty result immediately if there is no record.
>      - When take() is called, the calling thread reads from the buffer and
>      block waiting if the buffer is empty.
>
> On the other hand, with the hasNext()/next()/isNextReady() API, it is less
> intuitive for the reader developers to write the thread-less pattern.
> Although technically speaking one can still do the asynchronous IO to
> prepare the record in isNextReady(). But it is inexplicit and seems
> somewhat hacky.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <[hidden email]> wrote:
>
>> Couple more points regarding discovery:
>>
>> The proposal mentions that discovery could be outside the execution graph.
>> Today, discovered partitions/shards are checkpointed. I believe that will
>> also need to be the case in the future, even when discovery and reading are
>> split between different tasks.
>>
>> For cases such as resharding of a Kinesis stream, the relationship between
>> splits needs to be considered. Splits cannot be randomly distributed over
>> readers in certain situations. An example was mentioned here:
>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
>>
>> Thomas
>>
>>
>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[hidden email]> wrote:
>>
>>> Thanks for getting the ball rolling on this!
>>>
>>> Can the number of splits decrease? Yes, splits can be closed and go away.
>>> An example would be a shard merge in Kinesis (2 existing shards will be
>>> closed and replaced with a new shard).
>>>
>>> Regarding advance/poll/take: IMO the least restrictive approach would be
>>> the thread-less IO model (pull based, non-blocking, caller retrieves new
>>> records when available). The current Kinesis API requires the use of
>>> threads. But that can be internal to the split reader and does not need
>> to
>>> be a source API concern. In fact, that's what we are working on right now
>>> as improvement to the existing consumer: Each shard consumer thread will
>>> push to a queue, the consumer main thread will poll the queue(s). It is
>>> essentially a mapping from threaded IO to non-blocking.
>>>
>>> The proposed SplitReader interface would fit the thread-less IO model.
>>> Similar to an iterator, we find out if there is a new element (hasNext)
>> and
>>> if so, move to it (next()). Separate calls deliver the meta information
>>> (timestamp, watermark). Perhaps advance call could offer a timeout
>> option,
>>> so that the caller does not end up in a busy wait. On the other hand, a
>>> caller processing multiple splits may want to cycle through fast, to
>>> process elements of other splits as soon as they become available. The
>> nice
>>> thing is that this "split merge" logic can now live in Flink and be
>>> optimized and shared between different sources.
>>>
>>> Thanks,
>>> Thomas
>>>
>>>
>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]> wrote:
>>>
>>>> Hi,
>>>> Thanks Aljoscha for this FLIP.
>>>>
>>>> 1. I agree with Piotr and Becket that the non-blocking source is very
>>>> important. But in addition to `Future/poll`, there may be another way to
>>>> achieve this. I think it may be not very memory friendly if every
>> advance
>>>> call return a Future.
>>>>
>>>> public interface Listener {
>>>>     public void notify();
>>>> }
>>>>
>>>> public interface SplitReader() {
>>>>     /**
>>>>      * When there is no element temporarily, this will return false.
>>>>      * When elements is available again splitReader can call
>>>> listener.notify()
>>>>      * In addition the frame would check `advance` periodically .
>>>>      * Of course advance can always return true and ignore the listener
>>>> argument for simplicity.
>>>>      */
>>>>     public boolean advance(Listener listener);
>>>> }
>>>>
>>>> 2.  The FLIP tells us very clearly that how to create all Splits and how
>>>> to create a SplitReader from a Split. But there is no strategy for the
>> user
>>>> to choose how to assign the splits to the tasks. I think we could add a
>>>> Enum to let user to choose.
>>>> /**
>>>>  public Enum SplitsAssignmentPolicy {
>>>>    Location,
>>>>    Workload,
>>>>    Random,
>>>>    Average
>>>>  }
>>>> */
>>>>
>>>> 3. If merge the `advance` and `getCurrent`  to one method like `getNext`
>>>> the `getNext` would need return a `ElementWithTimestamp` because some
>>>> sources want to add timestamp to every element. IMO, this is not so
>> memory
>>>> friendly so I prefer this design.
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
>>>>> possible improvements. I have one proposal. Instead of having a method:
>>>>>
>>>>> boolean advance() throws IOException;
>>>>>
>>>>> I would replace it with
>>>>>
>>>>> /*
>>>>> * Return a future, which when completed means that source has more
>> data
>>>>> and getNext() will not block.
>>>>> * If you wish to use benefits of non blocking connectors, please
>>>>> implement this method appropriately.
>>>>> */
>>>>> default CompletableFuture<?> isBlocked() {
>>>>>        return CompletableFuture.completedFuture(null);
>>>>> }
>>>>>
>>>>> And rename `getCurrent()` to `getNext()`.
>>>>>
>>>>> Couple of arguments:
>>>>> 1. I don’t understand the division of work between `advance()` and
>>>>> `getCurrent()`. What should be done in which, especially for connectors
>>>>> that handle records in batches (like Kafka) and when should you call
>>>>> `advance` and when `getCurrent()`.
>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in the
>>>>> future to have asynchronous/non blocking connectors and more
>> efficiently
>>>>> handle large number of blocked threads, without busy waiting. While at
>> the
>>>>> same time it doesn’t add much complexity, since naive connector
>>>>> implementations can be always blocking.
>>>>> 3. This also would allow us to use a fixed size thread pool of task
>>>>> executors, instead of one thread per task.
>>>>>
>>>>> Piotrek
>>>>>
>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]>
>>>>> wrote:
>>>>>>
>>>>>> Hi All,
>>>>>>
>>>>>> In order to finally get the ball rolling on the new source interface
>>>>> that we have discussed for so long I finally created a FLIP:
>>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>>
>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion about
>>>>> adding per-partition watermark support to the Kinesis source and
>> because
>>>>> this would enable generic implementation of event-time alignment for
>> all
>>>>> sources. Maybe we need another FLIP for the event-time alignment part,
>>>>> especially the part about information sharing between operations (I'm
>> not
>>>>> calling it state sharing because state has a special meaning in Flink).
>>>>>>
>>>>>> Please discuss away!
>>>>>>
>>>>>> Aljoscha
>>>>>>
>>>>>>
>>>>>
>>>>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Becket Qin
Hi Piotr,

I might have misunderstood you proposal. But let me try to explain my
concern. I am thinking about the following case:
1. a reader has the following two interfaces,
    boolean isBlocked()
    T getNextElement()
2. the implementation of getNextElement() is non-blocking.
3. The reader is thread-less, i.e. it does not have any internal thread.
For example, it might just delegate the getNextElement() to a queue.poll(),
and isBlocked() is just queue.isEmpty().

How can Flink efficiently implement a blocking reading behavior with this
reader? Either a tight loop or a backoff interval is needed. Neither of
them is ideal.

Now let's say in the reader mentioned above implements a blocking
getNextElement() method. Because there is no internal thread in the reader,
after isBlocked() returns false. Flink will still have to loop on
isBlocked() to check whether the next record is available. If the next
record reaches after 10 min, it is a tight loop for 10 min. You have
probably noticed that in this case, even isBlocked() returns a future, that
future() will not be completed if Flink does not call some method from the
reader, because the reader has no internal thread to complete that future
by itself.

Due to the above reasons, a blocking take() API would allow Flink to have
an efficient way to read from a reader. There are many ways to wake up the
blocking thread when checkpointing is needed depending on the
implementation. But I think the poll()/take() API would also work in that
case.

Thanks,

Jiangjie (Becket) Qin

On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski <[hidden email]>
wrote:

> Hi,
>
> a)
>
> > BTW, regarding the isBlock() method, I have a few more questions. 21, Is
> a method isReady() with boolean as a return value
> > equivalent? Personally I found it is a little bit confusing in what is
> supposed to be returned when the future is completed. 22. if
> > the implementation of isBlocked() is optional, how do the callers know
> whether the method is properly implemented or not?
> > Does not implemented mean it always return a completed future?
>
> `CompletableFuture<?> isBlocked()` is more or less an equivalent to
> `boolean hasNext()` which in case of “false” provides some kind of a
> listener/callback that notifies about presence of next element. There are
> some minor details, like `CompletableFuture<?>` has a minimal two state
> logic:
>
> 1. Future is completed - we have more data
> 2. Future not yet completed - we don’t have data now, but we might/we will
> have in the future
>
> While `boolean hasNext()` and `notify()` callback are a bit more
> complicated/dispersed and can lead/encourage `notify()` spam.
>
> b)
>
> > 3. If merge the `advance` and `getCurrent`  to one method like `getNext`
> the `getNext` would need return a
> >`ElementWithTimestamp` because some sources want to add timestamp to
> every element. IMO, this is not so memory friendly
> > so I prefer this design.
>
> Guowei I don’t quite understand this. Could you elaborate why having a
> separate `advance()` help?
>
> c)
>
> Regarding advance/poll/take. What’s the value of having two separate
> methods: poll and take? Which one of them should be called and which
> implemented? What’s the benefit of having those methods compared to having
> a one single method `getNextElement()` (or `pollElement() or whatever we
> name it) with following contract:
>
> CompletableFuture<?> isBlocked();
>
> /**
> Return next element - will be called only if `isBlocked()` is completed.
> Try to implement it in non blocking fashion, but if that’s impossible or
> you just don’t need the effort, you can block in this method.
> */
> T getNextElement();
>
> I mean, if the connector is implemented non-blockingly, Flink should use
> it that way. If it’s not, then `poll()` will `throw new
> NotImplementedException()`. Implementing both of them and providing both of
> them to Flink wouldn’t make a sense, thus why not merge them into a single
> method call that should preferably (but not necessarily need to) be
> non-blocking? It’s not like we are implementing general purpose `Queue`,
> which users might want to call either of `poll` or `take`. We would always
> prefer to call `poll`, but if it’s blocking, then still we have no choice,
> but to call it and block on it.
>
> d)
>
> > 1. I agree with Piotr and Becket that the non-blocking source is very
> > important. But in addition to `Future/poll`, there may be another way to
> > achieve this. I think it may be not very memory friendly if every advance
> > call return a Future.
>
> I didn’t want to mention this, to not clog my initial proposal, but there
> is a simple solution for the problem:
>
> public interface SplitReader {
>
>     (…)
>
>     CompletableFuture<?> NOT_BLOCKED =
> CompletableFuture.completedFuture(null);
>
>     /**
>      * Returns a future that will be completed when the page source becomes
>      * unblocked.  If the page source is not blocked, this method should
> return
>      * {@code NOT_BLOCKED}.
>      */
>     default CompletableFuture<?> isBlocked()
>     {
>         return NOT_BLOCKED;
>     }
>
> If we are blocked and we are waiting for the IO, then creating a new
> Future is non-issue. Under full throttle/throughput and not blocked sources
> returning a static `NOT_BLOCKED` constant  should also solve the problem.
>
> One more remark, non-blocking sources might be a necessity in a single
> threaded model without a checkpointing lock. (Currently when sources are
> blocked, they can release checkpointing lock and re-acquire it again
> later). Non-blocking `poll`/`getNext()` would allow for checkpoints to
> happen when source is idling. In that case either `notify()` or my proposed
> `isBlocked()` would allow to avoid busy-looping.
>
> Piotrek
>
> > On 5 Nov 2018, at 03:59, Becket Qin <[hidden email]> wrote:
> >
> > Hi Thomas,
> >
> > The iterator-like API was also the first thing that came to me. But it
> > seems a little confusing that hasNext() does not mean "the stream has not
> > ended", but means "the next record is ready", which is repurposing the
> well
> > known meaning of hasNext(). If we follow the hasNext()/next() pattern, an
> > additional isNextReady() method to indicate whether the next record is
> > ready seems more intuitive to me.
> >
> > Similarly, in poll()/take() pattern, another method of isDone() is needed
> > to indicate whether the stream has ended or not.
> >
> > Compared with hasNext()/next()/isNextReady() pattern,
> > isDone()/poll()/take() seems more flexible for the reader implementation.
> > When I am implementing a reader, I could have a couple of choices:
> >
> >   - A thread-less reader that does not have any internal thread.
> >   - When poll() is called, the same calling thread will perform a bunch
> of
> >      IO asynchronously.
> >      - When take() is called, the same calling thread will perform a
> bunch
> >      of IO and wait until the record is ready.
> >   - A reader with internal threads performing network IO and put records
> >   into a buffer.
> >      - When poll() is called, the calling thread simply reads from the
> >      buffer and return empty result immediately if there is no record.
> >      - When take() is called, the calling thread reads from the buffer
> and
> >      block waiting if the buffer is empty.
> >
> > On the other hand, with the hasNext()/next()/isNextReady() API, it is
> less
> > intuitive for the reader developers to write the thread-less pattern.
> > Although technically speaking one can still do the asynchronous IO to
> > prepare the record in isNextReady(). But it is inexplicit and seems
> > somewhat hacky.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> >
> >
> > On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <[hidden email]> wrote:
> >
> >> Couple more points regarding discovery:
> >>
> >> The proposal mentions that discovery could be outside the execution
> graph.
> >> Today, discovered partitions/shards are checkpointed. I believe that
> will
> >> also need to be the case in the future, even when discovery and reading
> are
> >> split between different tasks.
> >>
> >> For cases such as resharding of a Kinesis stream, the relationship
> between
> >> splits needs to be considered. Splits cannot be randomly distributed
> over
> >> readers in certain situations. An example was mentioned here:
> >> https://github.com/apache/flink/pull/6980#issuecomment-435202809
> >>
> >> Thomas
> >>
> >>
> >> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[hidden email]> wrote:
> >>
> >>> Thanks for getting the ball rolling on this!
> >>>
> >>> Can the number of splits decrease? Yes, splits can be closed and go
> away.
> >>> An example would be a shard merge in Kinesis (2 existing shards will be
> >>> closed and replaced with a new shard).
> >>>
> >>> Regarding advance/poll/take: IMO the least restrictive approach would
> be
> >>> the thread-less IO model (pull based, non-blocking, caller retrieves
> new
> >>> records when available). The current Kinesis API requires the use of
> >>> threads. But that can be internal to the split reader and does not need
> >> to
> >>> be a source API concern. In fact, that's what we are working on right
> now
> >>> as improvement to the existing consumer: Each shard consumer thread
> will
> >>> push to a queue, the consumer main thread will poll the queue(s). It is
> >>> essentially a mapping from threaded IO to non-blocking.
> >>>
> >>> The proposed SplitReader interface would fit the thread-less IO model.
> >>> Similar to an iterator, we find out if there is a new element (hasNext)
> >> and
> >>> if so, move to it (next()). Separate calls deliver the meta information
> >>> (timestamp, watermark). Perhaps advance call could offer a timeout
> >> option,
> >>> so that the caller does not end up in a busy wait. On the other hand, a
> >>> caller processing multiple splits may want to cycle through fast, to
> >>> process elements of other splits as soon as they become available. The
> >> nice
> >>> thing is that this "split merge" logic can now live in Flink and be
> >>> optimized and shared between different sources.
> >>>
> >>> Thanks,
> >>> Thomas
> >>>
> >>>
> >>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]> wrote:
> >>>
> >>>> Hi,
> >>>> Thanks Aljoscha for this FLIP.
> >>>>
> >>>> 1. I agree with Piotr and Becket that the non-blocking source is very
> >>>> important. But in addition to `Future/poll`, there may be another way
> to
> >>>> achieve this. I think it may be not very memory friendly if every
> >> advance
> >>>> call return a Future.
> >>>>
> >>>> public interface Listener {
> >>>>     public void notify();
> >>>> }
> >>>>
> >>>> public interface SplitReader() {
> >>>>     /**
> >>>>      * When there is no element temporarily, this will return false.
> >>>>      * When elements is available again splitReader can call
> >>>> listener.notify()
> >>>>      * In addition the frame would check `advance` periodically .
> >>>>      * Of course advance can always return true and ignore the
> listener
> >>>> argument for simplicity.
> >>>>      */
> >>>>     public boolean advance(Listener listener);
> >>>> }
> >>>>
> >>>> 2.  The FLIP tells us very clearly that how to create all Splits and
> how
> >>>> to create a SplitReader from a Split. But there is no strategy for the
> >> user
> >>>> to choose how to assign the splits to the tasks. I think we could add
> a
> >>>> Enum to let user to choose.
> >>>> /**
> >>>>  public Enum SplitsAssignmentPolicy {
> >>>>    Location,
> >>>>    Workload,
> >>>>    Random,
> >>>>    Average
> >>>>  }
> >>>> */
> >>>>
> >>>> 3. If merge the `advance` and `getCurrent`  to one method like
> `getNext`
> >>>> the `getNext` would need return a `ElementWithTimestamp` because some
> >>>> sources want to add timestamp to every element. IMO, this is not so
> >> memory
> >>>> friendly so I prefer this design.
> >>>>
> >>>>
> >>>>
> >>>> Thanks
> >>>>
> >>>> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
> >>>>> possible improvements. I have one proposal. Instead of having a
> method:
> >>>>>
> >>>>> boolean advance() throws IOException;
> >>>>>
> >>>>> I would replace it with
> >>>>>
> >>>>> /*
> >>>>> * Return a future, which when completed means that source has more
> >> data
> >>>>> and getNext() will not block.
> >>>>> * If you wish to use benefits of non blocking connectors, please
> >>>>> implement this method appropriately.
> >>>>> */
> >>>>> default CompletableFuture<?> isBlocked() {
> >>>>>        return CompletableFuture.completedFuture(null);
> >>>>> }
> >>>>>
> >>>>> And rename `getCurrent()` to `getNext()`.
> >>>>>
> >>>>> Couple of arguments:
> >>>>> 1. I don’t understand the division of work between `advance()` and
> >>>>> `getCurrent()`. What should be done in which, especially for
> connectors
> >>>>> that handle records in batches (like Kafka) and when should you call
> >>>>> `advance` and when `getCurrent()`.
> >>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in
> the
> >>>>> future to have asynchronous/non blocking connectors and more
> >> efficiently
> >>>>> handle large number of blocked threads, without busy waiting. While
> at
> >> the
> >>>>> same time it doesn’t add much complexity, since naive connector
> >>>>> implementations can be always blocking.
> >>>>> 3. This also would allow us to use a fixed size thread pool of task
> >>>>> executors, instead of one thread per task.
> >>>>>
> >>>>> Piotrek
> >>>>>
> >>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]>
> >>>>> wrote:
> >>>>>>
> >>>>>> Hi All,
> >>>>>>
> >>>>>> In order to finally get the ball rolling on the new source interface
> >>>>> that we have discussed for so long I finally created a FLIP:
> >>>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >>>>>>
> >>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
> about
> >>>>> adding per-partition watermark support to the Kinesis source and
> >> because
> >>>>> this would enable generic implementation of event-time alignment for
> >> all
> >>>>> sources. Maybe we need another FLIP for the event-time alignment
> part,
> >>>>> especially the part about information sharing between operations (I'm
> >> not
> >>>>> calling it state sharing because state has a special meaning in
> Flink).
> >>>>>>
> >>>>>> Please discuss away!
> >>>>>>
> >>>>>> Aljoscha
> >>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Piotr Nowojski
Hi Becket,

With my proposal, both of your examples would have to be solved by the connector and solution to both problems would be the same:

Pretend that connector is never blocked (`isBlocked() { return NOT_BLOCKED; }`) and implement `getNextElement()` in blocking fashion (or semi blocking with return of control from time to time to allow for checkpointing, network flushing and other resource management things to happen in the same main thread). In other words, exactly how you would implement `take()` method or how the same source connector would be implemented NOW with current source interface. The difference with current interface would be only that main loop would be outside of the connector, and instead of periodically releasing checkpointing lock, periodically `return null;` or `return Optional.empty();` from `getNextElement()`.

In case of thread-less readers with only non-blocking `queue.poll()` (is that really a thing? I can not think about a real implementation that enforces such constraints), we could provide a wrapper that hides the busy looping. The same applies how to solve forever blocking readers - we could provider another wrapper running the connector in separate thread.  

But I don’t see a reason why we should expose both blocking `take()` and non-blocking `poll()` methods to the Flink engine. Someone (Flink engine or connector) would have to do the same busy looping anyway and I think it would be better to have a simpler connector API (that would solve our problems) and force connectors to comply one way or another.

Piotrek

> On 7 Nov 2018, at 10:55, Becket Qin <[hidden email]> wrote:
>
> Hi Piotr,
>
> I might have misunderstood you proposal. But let me try to explain my
> concern. I am thinking about the following case:
> 1. a reader has the following two interfaces,
>    boolean isBlocked()
>    T getNextElement()
> 2. the implementation of getNextElement() is non-blocking.
> 3. The reader is thread-less, i.e. it does not have any internal thread.
> For example, it might just delegate the getNextElement() to a queue.poll(),
> and isBlocked() is just queue.isEmpty().
>
> How can Flink efficiently implement a blocking reading behavior with this
> reader? Either a tight loop or a backoff interval is needed. Neither of
> them is ideal.
>
> Now let's say in the reader mentioned above implements a blocking
> getNextElement() method. Because there is no internal thread in the reader,
> after isBlocked() returns false. Flink will still have to loop on
> isBlocked() to check whether the next record is available. If the next
> record reaches after 10 min, it is a tight loop for 10 min. You have
> probably noticed that in this case, even isBlocked() returns a future, that
> future() will not be completed if Flink does not call some method from the
> reader, because the reader has no internal thread to complete that future
> by itself.
>
> Due to the above reasons, a blocking take() API would allow Flink to have
> an efficient way to read from a reader. There are many ways to wake up the
> blocking thread when checkpointing is needed depending on the
> implementation. But I think the poll()/take() API would also work in that
> case.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski <[hidden email]>
> wrote:
>
>> Hi,
>>
>> a)
>>
>>> BTW, regarding the isBlock() method, I have a few more questions. 21, Is
>> a method isReady() with boolean as a return value
>>> equivalent? Personally I found it is a little bit confusing in what is
>> supposed to be returned when the future is completed. 22. if
>>> the implementation of isBlocked() is optional, how do the callers know
>> whether the method is properly implemented or not?
>>> Does not implemented mean it always return a completed future?
>>
>> `CompletableFuture<?> isBlocked()` is more or less an equivalent to
>> `boolean hasNext()` which in case of “false” provides some kind of a
>> listener/callback that notifies about presence of next element. There are
>> some minor details, like `CompletableFuture<?>` has a minimal two state
>> logic:
>>
>> 1. Future is completed - we have more data
>> 2. Future not yet completed - we don’t have data now, but we might/we will
>> have in the future
>>
>> While `boolean hasNext()` and `notify()` callback are a bit more
>> complicated/dispersed and can lead/encourage `notify()` spam.
>>
>> b)
>>
>>> 3. If merge the `advance` and `getCurrent`  to one method like `getNext`
>> the `getNext` would need return a
>>> `ElementWithTimestamp` because some sources want to add timestamp to
>> every element. IMO, this is not so memory friendly
>>> so I prefer this design.
>>
>> Guowei I don’t quite understand this. Could you elaborate why having a
>> separate `advance()` help?
>>
>> c)
>>
>> Regarding advance/poll/take. What’s the value of having two separate
>> methods: poll and take? Which one of them should be called and which
>> implemented? What’s the benefit of having those methods compared to having
>> a one single method `getNextElement()` (or `pollElement() or whatever we
>> name it) with following contract:
>>
>> CompletableFuture<?> isBlocked();
>>
>> /**
>> Return next element - will be called only if `isBlocked()` is completed.
>> Try to implement it in non blocking fashion, but if that’s impossible or
>> you just don’t need the effort, you can block in this method.
>> */
>> T getNextElement();
>>
>> I mean, if the connector is implemented non-blockingly, Flink should use
>> it that way. If it’s not, then `poll()` will `throw new
>> NotImplementedException()`. Implementing both of them and providing both of
>> them to Flink wouldn’t make a sense, thus why not merge them into a single
>> method call that should preferably (but not necessarily need to) be
>> non-blocking? It’s not like we are implementing general purpose `Queue`,
>> which users might want to call either of `poll` or `take`. We would always
>> prefer to call `poll`, but if it’s blocking, then still we have no choice,
>> but to call it and block on it.
>>
>> d)
>>
>>> 1. I agree with Piotr and Becket that the non-blocking source is very
>>> important. But in addition to `Future/poll`, there may be another way to
>>> achieve this. I think it may be not very memory friendly if every advance
>>> call return a Future.
>>
>> I didn’t want to mention this, to not clog my initial proposal, but there
>> is a simple solution for the problem:
>>
>> public interface SplitReader {
>>
>>    (…)
>>
>>    CompletableFuture<?> NOT_BLOCKED =
>> CompletableFuture.completedFuture(null);
>>
>>    /**
>>     * Returns a future that will be completed when the page source becomes
>>     * unblocked.  If the page source is not blocked, this method should
>> return
>>     * {@code NOT_BLOCKED}.
>>     */
>>    default CompletableFuture<?> isBlocked()
>>    {
>>        return NOT_BLOCKED;
>>    }
>>
>> If we are blocked and we are waiting for the IO, then creating a new
>> Future is non-issue. Under full throttle/throughput and not blocked sources
>> returning a static `NOT_BLOCKED` constant  should also solve the problem.
>>
>> One more remark, non-blocking sources might be a necessity in a single
>> threaded model without a checkpointing lock. (Currently when sources are
>> blocked, they can release checkpointing lock and re-acquire it again
>> later). Non-blocking `poll`/`getNext()` would allow for checkpoints to
>> happen when source is idling. In that case either `notify()` or my proposed
>> `isBlocked()` would allow to avoid busy-looping.
>>
>> Piotrek
>>
>>> On 5 Nov 2018, at 03:59, Becket Qin <[hidden email]> wrote:
>>>
>>> Hi Thomas,
>>>
>>> The iterator-like API was also the first thing that came to me. But it
>>> seems a little confusing that hasNext() does not mean "the stream has not
>>> ended", but means "the next record is ready", which is repurposing the
>> well
>>> known meaning of hasNext(). If we follow the hasNext()/next() pattern, an
>>> additional isNextReady() method to indicate whether the next record is
>>> ready seems more intuitive to me.
>>>
>>> Similarly, in poll()/take() pattern, another method of isDone() is needed
>>> to indicate whether the stream has ended or not.
>>>
>>> Compared with hasNext()/next()/isNextReady() pattern,
>>> isDone()/poll()/take() seems more flexible for the reader implementation.
>>> When I am implementing a reader, I could have a couple of choices:
>>>
>>>  - A thread-less reader that does not have any internal thread.
>>>  - When poll() is called, the same calling thread will perform a bunch
>> of
>>>     IO asynchronously.
>>>     - When take() is called, the same calling thread will perform a
>> bunch
>>>     of IO and wait until the record is ready.
>>>  - A reader with internal threads performing network IO and put records
>>>  into a buffer.
>>>     - When poll() is called, the calling thread simply reads from the
>>>     buffer and return empty result immediately if there is no record.
>>>     - When take() is called, the calling thread reads from the buffer
>> and
>>>     block waiting if the buffer is empty.
>>>
>>> On the other hand, with the hasNext()/next()/isNextReady() API, it is
>> less
>>> intuitive for the reader developers to write the thread-less pattern.
>>> Although technically speaking one can still do the asynchronous IO to
>>> prepare the record in isNextReady(). But it is inexplicit and seems
>>> somewhat hacky.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>>
>>>
>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <[hidden email]> wrote:
>>>
>>>> Couple more points regarding discovery:
>>>>
>>>> The proposal mentions that discovery could be outside the execution
>> graph.
>>>> Today, discovered partitions/shards are checkpointed. I believe that
>> will
>>>> also need to be the case in the future, even when discovery and reading
>> are
>>>> split between different tasks.
>>>>
>>>> For cases such as resharding of a Kinesis stream, the relationship
>> between
>>>> splits needs to be considered. Splits cannot be randomly distributed
>> over
>>>> readers in certain situations. An example was mentioned here:
>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
>>>>
>>>> Thomas
>>>>
>>>>
>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[hidden email]> wrote:
>>>>
>>>>> Thanks for getting the ball rolling on this!
>>>>>
>>>>> Can the number of splits decrease? Yes, splits can be closed and go
>> away.
>>>>> An example would be a shard merge in Kinesis (2 existing shards will be
>>>>> closed and replaced with a new shard).
>>>>>
>>>>> Regarding advance/poll/take: IMO the least restrictive approach would
>> be
>>>>> the thread-less IO model (pull based, non-blocking, caller retrieves
>> new
>>>>> records when available). The current Kinesis API requires the use of
>>>>> threads. But that can be internal to the split reader and does not need
>>>> to
>>>>> be a source API concern. In fact, that's what we are working on right
>> now
>>>>> as improvement to the existing consumer: Each shard consumer thread
>> will
>>>>> push to a queue, the consumer main thread will poll the queue(s). It is
>>>>> essentially a mapping from threaded IO to non-blocking.
>>>>>
>>>>> The proposed SplitReader interface would fit the thread-less IO model.
>>>>> Similar to an iterator, we find out if there is a new element (hasNext)
>>>> and
>>>>> if so, move to it (next()). Separate calls deliver the meta information
>>>>> (timestamp, watermark). Perhaps advance call could offer a timeout
>>>> option,
>>>>> so that the caller does not end up in a busy wait. On the other hand, a
>>>>> caller processing multiple splits may want to cycle through fast, to
>>>>> process elements of other splits as soon as they become available. The
>>>> nice
>>>>> thing is that this "split merge" logic can now live in Flink and be
>>>>> optimized and shared between different sources.
>>>>>
>>>>> Thanks,
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]> wrote:
>>>>>
>>>>>> Hi,
>>>>>> Thanks Aljoscha for this FLIP.
>>>>>>
>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is very
>>>>>> important. But in addition to `Future/poll`, there may be another way
>> to
>>>>>> achieve this. I think it may be not very memory friendly if every
>>>> advance
>>>>>> call return a Future.
>>>>>>
>>>>>> public interface Listener {
>>>>>>    public void notify();
>>>>>> }
>>>>>>
>>>>>> public interface SplitReader() {
>>>>>>    /**
>>>>>>     * When there is no element temporarily, this will return false.
>>>>>>     * When elements is available again splitReader can call
>>>>>> listener.notify()
>>>>>>     * In addition the frame would check `advance` periodically .
>>>>>>     * Of course advance can always return true and ignore the
>> listener
>>>>>> argument for simplicity.
>>>>>>     */
>>>>>>    public boolean advance(Listener listener);
>>>>>> }
>>>>>>
>>>>>> 2.  The FLIP tells us very clearly that how to create all Splits and
>> how
>>>>>> to create a SplitReader from a Split. But there is no strategy for the
>>>> user
>>>>>> to choose how to assign the splits to the tasks. I think we could add
>> a
>>>>>> Enum to let user to choose.
>>>>>> /**
>>>>>> public Enum SplitsAssignmentPolicy {
>>>>>>   Location,
>>>>>>   Workload,
>>>>>>   Random,
>>>>>>   Average
>>>>>> }
>>>>>> */
>>>>>>
>>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
>> `getNext`
>>>>>> the `getNext` would need return a `ElementWithTimestamp` because some
>>>>>> sources want to add timestamp to every element. IMO, this is not so
>>>> memory
>>>>>> friendly so I prefer this design.
>>>>>>
>>>>>>
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of other
>>>>>>> possible improvements. I have one proposal. Instead of having a
>> method:
>>>>>>>
>>>>>>> boolean advance() throws IOException;
>>>>>>>
>>>>>>> I would replace it with
>>>>>>>
>>>>>>> /*
>>>>>>> * Return a future, which when completed means that source has more
>>>> data
>>>>>>> and getNext() will not block.
>>>>>>> * If you wish to use benefits of non blocking connectors, please
>>>>>>> implement this method appropriately.
>>>>>>> */
>>>>>>> default CompletableFuture<?> isBlocked() {
>>>>>>>       return CompletableFuture.completedFuture(null);
>>>>>>> }
>>>>>>>
>>>>>>> And rename `getCurrent()` to `getNext()`.
>>>>>>>
>>>>>>> Couple of arguments:
>>>>>>> 1. I don’t understand the division of work between `advance()` and
>>>>>>> `getCurrent()`. What should be done in which, especially for
>> connectors
>>>>>>> that handle records in batches (like Kafka) and when should you call
>>>>>>> `advance` and when `getCurrent()`.
>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in
>> the
>>>>>>> future to have asynchronous/non blocking connectors and more
>>>> efficiently
>>>>>>> handle large number of blocked threads, without busy waiting. While
>> at
>>>> the
>>>>>>> same time it doesn’t add much complexity, since naive connector
>>>>>>> implementations can be always blocking.
>>>>>>> 3. This also would allow us to use a fixed size thread pool of task
>>>>>>> executors, instead of one thread per task.
>>>>>>>
>>>>>>> Piotrek
>>>>>>>
>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]>
>>>>>>> wrote:
>>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> In order to finally get the ball rolling on the new source interface
>>>>>>> that we have discussed for so long I finally created a FLIP:
>>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>>>>
>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
>> about
>>>>>>> adding per-partition watermark support to the Kinesis source and
>>>> because
>>>>>>> this would enable generic implementation of event-time alignment for
>>>> all
>>>>>>> sources. Maybe we need another FLIP for the event-time alignment
>> part,
>>>>>>> especially the part about information sharing between operations (I'm
>>>> not
>>>>>>> calling it state sharing because state has a special meaning in
>> Flink).
>>>>>>>>
>>>>>>>> Please discuss away!
>>>>>>>>
>>>>>>>> Aljoscha
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Becket Qin
Hi Piotrek,

> But I don’t see a reason why we should expose both blocking `take()` and
non-blocking `poll()` methods to the Flink engine. Someone (Flink engine or
connector) would have to do the same busy
> looping anyway and I think it would be better to have a simpler connector
API (that would solve our problems) and force connectors to comply one way
or another.

If we let the block happen inside the connector, the blocking does not have
to be a busy loop. For example, to do the block waiting efficiently, the
connector can use java NIO selector().select which relies on OS syscall
like epoll[1] instead of busy looping. But if Flink engine blocks outside
the connector, it pretty much has to do the busy loop. So if there is only
one API to get the element, a blocking getNextElement() makes more sense.
In any case, we should avoid ambiguity. It has to be crystal clear about
whether a method is expected to be blocking or non-blocking. Otherwise it
would be very difficult for Flink engine to do the right thing with the
connectors. At the first glance at getCurrent(), the expected behavior is
not quite clear.

That said, I do agree that functionality wise, poll() and take() kind of
overlap. But they are actually not quite different from
isBlocked()/getNextElement(). Compared with isBlocked(), the only
difference is that poll() also returns the next record if it is available.
But I agree that the isBlocked() + getNextElement() is more flexible as
users can just check the record availability, but not fetch the next
element.

> In case of thread-less readers with only non-blocking `queue.poll()` (is
that really a thing? I can not think about a real implementation that
enforces such constraints)
Right, it is pretty much a syntax sugar to allow user combine the
check-and-take into one method. It could be achieved with isBlocked() +
getNextElement().

[1] http://man7.org/linux/man-pages/man7/epoll.7.html

Thanks,

Jiangjie (Becket) Qin

On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski <[hidden email]>
wrote:

> Hi Becket,
>
> With my proposal, both of your examples would have to be solved by the
> connector and solution to both problems would be the same:
>
> Pretend that connector is never blocked (`isBlocked() { return
> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking fashion (or
> semi blocking with return of control from time to time to allow for
> checkpointing, network flushing and other resource management things to
> happen in the same main thread). In other words, exactly how you would
> implement `take()` method or how the same source connector would be
> implemented NOW with current source interface. The difference with current
> interface would be only that main loop would be outside of the connector,
> and instead of periodically releasing checkpointing lock, periodically
> `return null;` or `return Optional.empty();` from `getNextElement()`.
>
> In case of thread-less readers with only non-blocking `queue.poll()` (is
> that really a thing? I can not think about a real implementation that
> enforces such constraints), we could provide a wrapper that hides the busy
> looping. The same applies how to solve forever blocking readers - we could
> provider another wrapper running the connector in separate thread.
>
> But I don’t see a reason why we should expose both blocking `take()` and
> non-blocking `poll()` methods to the Flink engine. Someone (Flink engine or
> connector) would have to do the same busy looping anyway and I think it
> would be better to have a simpler connector API (that would solve our
> problems) and force connectors to comply one way or another.
>
> Piotrek
>
> > On 7 Nov 2018, at 10:55, Becket Qin <[hidden email]> wrote:
> >
> > Hi Piotr,
> >
> > I might have misunderstood you proposal. But let me try to explain my
> > concern. I am thinking about the following case:
> > 1. a reader has the following two interfaces,
> >    boolean isBlocked()
> >    T getNextElement()
> > 2. the implementation of getNextElement() is non-blocking.
> > 3. The reader is thread-less, i.e. it does not have any internal thread.
> > For example, it might just delegate the getNextElement() to a
> queue.poll(),
> > and isBlocked() is just queue.isEmpty().
> >
> > How can Flink efficiently implement a blocking reading behavior with this
> > reader? Either a tight loop or a backoff interval is needed. Neither of
> > them is ideal.
> >
> > Now let's say in the reader mentioned above implements a blocking
> > getNextElement() method. Because there is no internal thread in the
> reader,
> > after isBlocked() returns false. Flink will still have to loop on
> > isBlocked() to check whether the next record is available. If the next
> > record reaches after 10 min, it is a tight loop for 10 min. You have
> > probably noticed that in this case, even isBlocked() returns a future,
> that
> > future() will not be completed if Flink does not call some method from
> the
> > reader, because the reader has no internal thread to complete that future
> > by itself.
> >
> > Due to the above reasons, a blocking take() API would allow Flink to have
> > an efficient way to read from a reader. There are many ways to wake up
> the
> > blocking thread when checkpointing is needed depending on the
> > implementation. But I think the poll()/take() API would also work in that
> > case.
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski <[hidden email]>
> > wrote:
> >
> >> Hi,
> >>
> >> a)
> >>
> >>> BTW, regarding the isBlock() method, I have a few more questions. 21,
> Is
> >> a method isReady() with boolean as a return value
> >>> equivalent? Personally I found it is a little bit confusing in what is
> >> supposed to be returned when the future is completed. 22. if
> >>> the implementation of isBlocked() is optional, how do the callers know
> >> whether the method is properly implemented or not?
> >>> Does not implemented mean it always return a completed future?
> >>
> >> `CompletableFuture<?> isBlocked()` is more or less an equivalent to
> >> `boolean hasNext()` which in case of “false” provides some kind of a
> >> listener/callback that notifies about presence of next element. There
> are
> >> some minor details, like `CompletableFuture<?>` has a minimal two state
> >> logic:
> >>
> >> 1. Future is completed - we have more data
> >> 2. Future not yet completed - we don’t have data now, but we might/we
> will
> >> have in the future
> >>
> >> While `boolean hasNext()` and `notify()` callback are a bit more
> >> complicated/dispersed and can lead/encourage `notify()` spam.
> >>
> >> b)
> >>
> >>> 3. If merge the `advance` and `getCurrent`  to one method like
> `getNext`
> >> the `getNext` would need return a
> >>> `ElementWithTimestamp` because some sources want to add timestamp to
> >> every element. IMO, this is not so memory friendly
> >>> so I prefer this design.
> >>
> >> Guowei I don’t quite understand this. Could you elaborate why having a
> >> separate `advance()` help?
> >>
> >> c)
> >>
> >> Regarding advance/poll/take. What’s the value of having two separate
> >> methods: poll and take? Which one of them should be called and which
> >> implemented? What’s the benefit of having those methods compared to
> having
> >> a one single method `getNextElement()` (or `pollElement() or whatever we
> >> name it) with following contract:
> >>
> >> CompletableFuture<?> isBlocked();
> >>
> >> /**
> >> Return next element - will be called only if `isBlocked()` is completed.
> >> Try to implement it in non blocking fashion, but if that’s impossible or
> >> you just don’t need the effort, you can block in this method.
> >> */
> >> T getNextElement();
> >>
> >> I mean, if the connector is implemented non-blockingly, Flink should use
> >> it that way. If it’s not, then `poll()` will `throw new
> >> NotImplementedException()`. Implementing both of them and providing
> both of
> >> them to Flink wouldn’t make a sense, thus why not merge them into a
> single
> >> method call that should preferably (but not necessarily need to) be
> >> non-blocking? It’s not like we are implementing general purpose `Queue`,
> >> which users might want to call either of `poll` or `take`. We would
> always
> >> prefer to call `poll`, but if it’s blocking, then still we have no
> choice,
> >> but to call it and block on it.
> >>
> >> d)
> >>
> >>> 1. I agree with Piotr and Becket that the non-blocking source is very
> >>> important. But in addition to `Future/poll`, there may be another way
> to
> >>> achieve this. I think it may be not very memory friendly if every
> advance
> >>> call return a Future.
> >>
> >> I didn’t want to mention this, to not clog my initial proposal, but
> there
> >> is a simple solution for the problem:
> >>
> >> public interface SplitReader {
> >>
> >>    (…)
> >>
> >>    CompletableFuture<?> NOT_BLOCKED =
> >> CompletableFuture.completedFuture(null);
> >>
> >>    /**
> >>     * Returns a future that will be completed when the page source
> becomes
> >>     * unblocked.  If the page source is not blocked, this method should
> >> return
> >>     * {@code NOT_BLOCKED}.
> >>     */
> >>    default CompletableFuture<?> isBlocked()
> >>    {
> >>        return NOT_BLOCKED;
> >>    }
> >>
> >> If we are blocked and we are waiting for the IO, then creating a new
> >> Future is non-issue. Under full throttle/throughput and not blocked
> sources
> >> returning a static `NOT_BLOCKED` constant  should also solve the
> problem.
> >>
> >> One more remark, non-blocking sources might be a necessity in a single
> >> threaded model without a checkpointing lock. (Currently when sources are
> >> blocked, they can release checkpointing lock and re-acquire it again
> >> later). Non-blocking `poll`/`getNext()` would allow for checkpoints to
> >> happen when source is idling. In that case either `notify()` or my
> proposed
> >> `isBlocked()` would allow to avoid busy-looping.
> >>
> >> Piotrek
> >>
> >>> On 5 Nov 2018, at 03:59, Becket Qin <[hidden email]> wrote:
> >>>
> >>> Hi Thomas,
> >>>
> >>> The iterator-like API was also the first thing that came to me. But it
> >>> seems a little confusing that hasNext() does not mean "the stream has
> not
> >>> ended", but means "the next record is ready", which is repurposing the
> >> well
> >>> known meaning of hasNext(). If we follow the hasNext()/next() pattern,
> an
> >>> additional isNextReady() method to indicate whether the next record is
> >>> ready seems more intuitive to me.
> >>>
> >>> Similarly, in poll()/take() pattern, another method of isDone() is
> needed
> >>> to indicate whether the stream has ended or not.
> >>>
> >>> Compared with hasNext()/next()/isNextReady() pattern,
> >>> isDone()/poll()/take() seems more flexible for the reader
> implementation.
> >>> When I am implementing a reader, I could have a couple of choices:
> >>>
> >>>  - A thread-less reader that does not have any internal thread.
> >>>  - When poll() is called, the same calling thread will perform a bunch
> >> of
> >>>     IO asynchronously.
> >>>     - When take() is called, the same calling thread will perform a
> >> bunch
> >>>     of IO and wait until the record is ready.
> >>>  - A reader with internal threads performing network IO and put records
> >>>  into a buffer.
> >>>     - When poll() is called, the calling thread simply reads from the
> >>>     buffer and return empty result immediately if there is no record.
> >>>     - When take() is called, the calling thread reads from the buffer
> >> and
> >>>     block waiting if the buffer is empty.
> >>>
> >>> On the other hand, with the hasNext()/next()/isNextReady() API, it is
> >> less
> >>> intuitive for the reader developers to write the thread-less pattern.
> >>> Although technically speaking one can still do the asynchronous IO to
> >>> prepare the record in isNextReady(). But it is inexplicit and seems
> >>> somewhat hacky.
> >>>
> >>> Thanks,
> >>>
> >>> Jiangjie (Becket) Qin
> >>>
> >>>
> >>>
> >>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <[hidden email]> wrote:
> >>>
> >>>> Couple more points regarding discovery:
> >>>>
> >>>> The proposal mentions that discovery could be outside the execution
> >> graph.
> >>>> Today, discovered partitions/shards are checkpointed. I believe that
> >> will
> >>>> also need to be the case in the future, even when discovery and
> reading
> >> are
> >>>> split between different tasks.
> >>>>
> >>>> For cases such as resharding of a Kinesis stream, the relationship
> >> between
> >>>> splits needs to be considered. Splits cannot be randomly distributed
> >> over
> >>>> readers in certain situations. An example was mentioned here:
> >>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
> >>>>
> >>>> Thomas
> >>>>
> >>>>
> >>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[hidden email]> wrote:
> >>>>
> >>>>> Thanks for getting the ball rolling on this!
> >>>>>
> >>>>> Can the number of splits decrease? Yes, splits can be closed and go
> >> away.
> >>>>> An example would be a shard merge in Kinesis (2 existing shards will
> be
> >>>>> closed and replaced with a new shard).
> >>>>>
> >>>>> Regarding advance/poll/take: IMO the least restrictive approach would
> >> be
> >>>>> the thread-less IO model (pull based, non-blocking, caller retrieves
> >> new
> >>>>> records when available). The current Kinesis API requires the use of
> >>>>> threads. But that can be internal to the split reader and does not
> need
> >>>> to
> >>>>> be a source API concern. In fact, that's what we are working on right
> >> now
> >>>>> as improvement to the existing consumer: Each shard consumer thread
> >> will
> >>>>> push to a queue, the consumer main thread will poll the queue(s). It
> is
> >>>>> essentially a mapping from threaded IO to non-blocking.
> >>>>>
> >>>>> The proposed SplitReader interface would fit the thread-less IO
> model.
> >>>>> Similar to an iterator, we find out if there is a new element
> (hasNext)
> >>>> and
> >>>>> if so, move to it (next()). Separate calls deliver the meta
> information
> >>>>> (timestamp, watermark). Perhaps advance call could offer a timeout
> >>>> option,
> >>>>> so that the caller does not end up in a busy wait. On the other
> hand, a
> >>>>> caller processing multiple splits may want to cycle through fast, to
> >>>>> process elements of other splits as soon as they become available.
> The
> >>>> nice
> >>>>> thing is that this "split merge" logic can now live in Flink and be
> >>>>> optimized and shared between different sources.
> >>>>>
> >>>>> Thanks,
> >>>>> Thomas
> >>>>>
> >>>>>
> >>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]>
> wrote:
> >>>>>
> >>>>>> Hi,
> >>>>>> Thanks Aljoscha for this FLIP.
> >>>>>>
> >>>>>> 1. I agree with Piotr and Becket that the non-blocking source is
> very
> >>>>>> important. But in addition to `Future/poll`, there may be another
> way
> >> to
> >>>>>> achieve this. I think it may be not very memory friendly if every
> >>>> advance
> >>>>>> call return a Future.
> >>>>>>
> >>>>>> public interface Listener {
> >>>>>>    public void notify();
> >>>>>> }
> >>>>>>
> >>>>>> public interface SplitReader() {
> >>>>>>    /**
> >>>>>>     * When there is no element temporarily, this will return false.
> >>>>>>     * When elements is available again splitReader can call
> >>>>>> listener.notify()
> >>>>>>     * In addition the frame would check `advance` periodically .
> >>>>>>     * Of course advance can always return true and ignore the
> >> listener
> >>>>>> argument for simplicity.
> >>>>>>     */
> >>>>>>    public boolean advance(Listener listener);
> >>>>>> }
> >>>>>>
> >>>>>> 2.  The FLIP tells us very clearly that how to create all Splits and
> >> how
> >>>>>> to create a SplitReader from a Split. But there is no strategy for
> the
> >>>> user
> >>>>>> to choose how to assign the splits to the tasks. I think we could
> add
> >> a
> >>>>>> Enum to let user to choose.
> >>>>>> /**
> >>>>>> public Enum SplitsAssignmentPolicy {
> >>>>>>   Location,
> >>>>>>   Workload,
> >>>>>>   Random,
> >>>>>>   Average
> >>>>>> }
> >>>>>> */
> >>>>>>
> >>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
> >> `getNext`
> >>>>>> the `getNext` would need return a `ElementWithTimestamp` because
> some
> >>>>>> sources want to add timestamp to every element. IMO, this is not so
> >>>> memory
> >>>>>> friendly so I prefer this design.
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> Thanks
> >>>>>>
> >>>>>> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
> >>>>>>
> >>>>>>> Hi,
> >>>>>>>
> >>>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of
> other
> >>>>>>> possible improvements. I have one proposal. Instead of having a
> >> method:
> >>>>>>>
> >>>>>>> boolean advance() throws IOException;
> >>>>>>>
> >>>>>>> I would replace it with
> >>>>>>>
> >>>>>>> /*
> >>>>>>> * Return a future, which when completed means that source has more
> >>>> data
> >>>>>>> and getNext() will not block.
> >>>>>>> * If you wish to use benefits of non blocking connectors, please
> >>>>>>> implement this method appropriately.
> >>>>>>> */
> >>>>>>> default CompletableFuture<?> isBlocked() {
> >>>>>>>       return CompletableFuture.completedFuture(null);
> >>>>>>> }
> >>>>>>>
> >>>>>>> And rename `getCurrent()` to `getNext()`.
> >>>>>>>
> >>>>>>> Couple of arguments:
> >>>>>>> 1. I don’t understand the division of work between `advance()` and
> >>>>>>> `getCurrent()`. What should be done in which, especially for
> >> connectors
> >>>>>>> that handle records in batches (like Kafka) and when should you
> call
> >>>>>>> `advance` and when `getCurrent()`.
> >>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in
> >> the
> >>>>>>> future to have asynchronous/non blocking connectors and more
> >>>> efficiently
> >>>>>>> handle large number of blocked threads, without busy waiting. While
> >> at
> >>>> the
> >>>>>>> same time it doesn’t add much complexity, since naive connector
> >>>>>>> implementations can be always blocking.
> >>>>>>> 3. This also would allow us to use a fixed size thread pool of task
> >>>>>>> executors, instead of one thread per task.
> >>>>>>>
> >>>>>>> Piotrek
> >>>>>>>
> >>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]>
> >>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> Hi All,
> >>>>>>>>
> >>>>>>>> In order to finally get the ball rolling on the new source
> interface
> >>>>>>> that we have discussed for so long I finally created a FLIP:
> >>>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >>>>>>>>
> >>>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
> >> about
> >>>>>>> adding per-partition watermark support to the Kinesis source and
> >>>> because
> >>>>>>> this would enable generic implementation of event-time alignment
> for
> >>>> all
> >>>>>>> sources. Maybe we need another FLIP for the event-time alignment
> >> part,
> >>>>>>> especially the part about information sharing between operations
> (I'm
> >>>> not
> >>>>>>> calling it state sharing because state has a special meaning in
> >> Flink).
> >>>>>>>>
> >>>>>>>> Please discuss away!
> >>>>>>>>
> >>>>>>>> Aljoscha
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Piotr Nowojski
Hi

Good point with select/epoll, however I do not see how they couldn’t be with Flink if we would like single task in Flink to be single-threaded (and I believe we should pursue this goal). If your connector blocks on `select`, then it can not process/handle control messages from Flink, like checkpoints, releasing resources and potentially output flushes. This would require tight integration between connector and Flink’s main event loop/selects/etc.

Looking at it from other perspective. Let’s assume that we have a connector implemented on top of `select`/`epoll`. In order to integrate it with Flink’s checkpointing/flushes/resource releasing it will have to be executed in separate thread one way or another. At least if our API will enforce/encourage non blocking implementations with some kind of notifications (`isBlocked()` or `notify()` callback), some connectors might skip one layer of wapping threads.

Btw, I don’t know if you understand my point. Having only `poll()` and `take()` is not enough for single threaded task. If our source interface doesn’t provide `notify()` callback nor `CompletableFuture<?> isBlocked(),`, there is no way to implement single threaded task that both reads the data from the source connector and can also react to system events. Ok, non blocking `poll()` would allow that, but with busy looping.

Piotrek

> On 8 Nov 2018, at 06:56, Becket Qin <[hidden email]> wrote:
>
> Hi Piotrek,
>
>> But I don’t see a reason why we should expose both blocking `take()` and
> non-blocking `poll()` methods to the Flink engine. Someone (Flink engine or
> connector) would have to do the same busy
>> looping anyway and I think it would be better to have a simpler connector
> API (that would solve our problems) and force connectors to comply one way
> or another.
>
> If we let the block happen inside the connector, the blocking does not have
> to be a busy loop. For example, to do the block waiting efficiently, the
> connector can use java NIO selector().select which relies on OS syscall
> like epoll[1] instead of busy looping. But if Flink engine blocks outside
> the connector, it pretty much has to do the busy loop. So if there is only
> one API to get the element, a blocking getNextElement() makes more sense.
> In any case, we should avoid ambiguity. It has to be crystal clear about
> whether a method is expected to be blocking or non-blocking. Otherwise it
> would be very difficult for Flink engine to do the right thing with the
> connectors. At the first glance at getCurrent(), the expected behavior is
> not quite clear.
>
> That said, I do agree that functionality wise, poll() and take() kind of
> overlap. But they are actually not quite different from
> isBlocked()/getNextElement(). Compared with isBlocked(), the only
> difference is that poll() also returns the next record if it is available.
> But I agree that the isBlocked() + getNextElement() is more flexible as
> users can just check the record availability, but not fetch the next
> element.
>
>> In case of thread-less readers with only non-blocking `queue.poll()` (is
> that really a thing? I can not think about a real implementation that
> enforces such constraints)
> Right, it is pretty much a syntax sugar to allow user combine the
> check-and-take into one method. It could be achieved with isBlocked() +
> getNextElement().
>
> [1] http://man7.org/linux/man-pages/man7/epoll.7.html
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
> On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski <[hidden email]>
> wrote:
>
>> Hi Becket,
>>
>> With my proposal, both of your examples would have to be solved by the
>> connector and solution to both problems would be the same:
>>
>> Pretend that connector is never blocked (`isBlocked() { return
>> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking fashion (or
>> semi blocking with return of control from time to time to allow for
>> checkpointing, network flushing and other resource management things to
>> happen in the same main thread). In other words, exactly how you would
>> implement `take()` method or how the same source connector would be
>> implemented NOW with current source interface. The difference with current
>> interface would be only that main loop would be outside of the connector,
>> and instead of periodically releasing checkpointing lock, periodically
>> `return null;` or `return Optional.empty();` from `getNextElement()`.
>>
>> In case of thread-less readers with only non-blocking `queue.poll()` (is
>> that really a thing? I can not think about a real implementation that
>> enforces such constraints), we could provide a wrapper that hides the busy
>> looping. The same applies how to solve forever blocking readers - we could
>> provider another wrapper running the connector in separate thread.
>>
>> But I don’t see a reason why we should expose both blocking `take()` and
>> non-blocking `poll()` methods to the Flink engine. Someone (Flink engine or
>> connector) would have to do the same busy looping anyway and I think it
>> would be better to have a simpler connector API (that would solve our
>> problems) and force connectors to comply one way or another.
>>
>> Piotrek
>>
>>> On 7 Nov 2018, at 10:55, Becket Qin <[hidden email]> wrote:
>>>
>>> Hi Piotr,
>>>
>>> I might have misunderstood you proposal. But let me try to explain my
>>> concern. I am thinking about the following case:
>>> 1. a reader has the following two interfaces,
>>>   boolean isBlocked()
>>>   T getNextElement()
>>> 2. the implementation of getNextElement() is non-blocking.
>>> 3. The reader is thread-less, i.e. it does not have any internal thread.
>>> For example, it might just delegate the getNextElement() to a
>> queue.poll(),
>>> and isBlocked() is just queue.isEmpty().
>>>
>>> How can Flink efficiently implement a blocking reading behavior with this
>>> reader? Either a tight loop or a backoff interval is needed. Neither of
>>> them is ideal.
>>>
>>> Now let's say in the reader mentioned above implements a blocking
>>> getNextElement() method. Because there is no internal thread in the
>> reader,
>>> after isBlocked() returns false. Flink will still have to loop on
>>> isBlocked() to check whether the next record is available. If the next
>>> record reaches after 10 min, it is a tight loop for 10 min. You have
>>> probably noticed that in this case, even isBlocked() returns a future,
>> that
>>> future() will not be completed if Flink does not call some method from
>> the
>>> reader, because the reader has no internal thread to complete that future
>>> by itself.
>>>
>>> Due to the above reasons, a blocking take() API would allow Flink to have
>>> an efficient way to read from a reader. There are many ways to wake up
>> the
>>> blocking thread when checkpointing is needed depending on the
>>> implementation. But I think the poll()/take() API would also work in that
>>> case.
>>>
>>> Thanks,
>>>
>>> Jiangjie (Becket) Qin
>>>
>>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski <[hidden email]>
>>> wrote:
>>>
>>>> Hi,
>>>>
>>>> a)
>>>>
>>>>> BTW, regarding the isBlock() method, I have a few more questions. 21,
>> Is
>>>> a method isReady() with boolean as a return value
>>>>> equivalent? Personally I found it is a little bit confusing in what is
>>>> supposed to be returned when the future is completed. 22. if
>>>>> the implementation of isBlocked() is optional, how do the callers know
>>>> whether the method is properly implemented or not?
>>>>> Does not implemented mean it always return a completed future?
>>>>
>>>> `CompletableFuture<?> isBlocked()` is more or less an equivalent to
>>>> `boolean hasNext()` which in case of “false” provides some kind of a
>>>> listener/callback that notifies about presence of next element. There
>> are
>>>> some minor details, like `CompletableFuture<?>` has a minimal two state
>>>> logic:
>>>>
>>>> 1. Future is completed - we have more data
>>>> 2. Future not yet completed - we don’t have data now, but we might/we
>> will
>>>> have in the future
>>>>
>>>> While `boolean hasNext()` and `notify()` callback are a bit more
>>>> complicated/dispersed and can lead/encourage `notify()` spam.
>>>>
>>>> b)
>>>>
>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
>> `getNext`
>>>> the `getNext` would need return a
>>>>> `ElementWithTimestamp` because some sources want to add timestamp to
>>>> every element. IMO, this is not so memory friendly
>>>>> so I prefer this design.
>>>>
>>>> Guowei I don’t quite understand this. Could you elaborate why having a
>>>> separate `advance()` help?
>>>>
>>>> c)
>>>>
>>>> Regarding advance/poll/take. What’s the value of having two separate
>>>> methods: poll and take? Which one of them should be called and which
>>>> implemented? What’s the benefit of having those methods compared to
>> having
>>>> a one single method `getNextElement()` (or `pollElement() or whatever we
>>>> name it) with following contract:
>>>>
>>>> CompletableFuture<?> isBlocked();
>>>>
>>>> /**
>>>> Return next element - will be called only if `isBlocked()` is completed.
>>>> Try to implement it in non blocking fashion, but if that’s impossible or
>>>> you just don’t need the effort, you can block in this method.
>>>> */
>>>> T getNextElement();
>>>>
>>>> I mean, if the connector is implemented non-blockingly, Flink should use
>>>> it that way. If it’s not, then `poll()` will `throw new
>>>> NotImplementedException()`. Implementing both of them and providing
>> both of
>>>> them to Flink wouldn’t make a sense, thus why not merge them into a
>> single
>>>> method call that should preferably (but not necessarily need to) be
>>>> non-blocking? It’s not like we are implementing general purpose `Queue`,
>>>> which users might want to call either of `poll` or `take`. We would
>> always
>>>> prefer to call `poll`, but if it’s blocking, then still we have no
>> choice,
>>>> but to call it and block on it.
>>>>
>>>> d)
>>>>
>>>>> 1. I agree with Piotr and Becket that the non-blocking source is very
>>>>> important. But in addition to `Future/poll`, there may be another way
>> to
>>>>> achieve this. I think it may be not very memory friendly if every
>> advance
>>>>> call return a Future.
>>>>
>>>> I didn’t want to mention this, to not clog my initial proposal, but
>> there
>>>> is a simple solution for the problem:
>>>>
>>>> public interface SplitReader {
>>>>
>>>>   (…)
>>>>
>>>>   CompletableFuture<?> NOT_BLOCKED =
>>>> CompletableFuture.completedFuture(null);
>>>>
>>>>   /**
>>>>    * Returns a future that will be completed when the page source
>> becomes
>>>>    * unblocked.  If the page source is not blocked, this method should
>>>> return
>>>>    * {@code NOT_BLOCKED}.
>>>>    */
>>>>   default CompletableFuture<?> isBlocked()
>>>>   {
>>>>       return NOT_BLOCKED;
>>>>   }
>>>>
>>>> If we are blocked and we are waiting for the IO, then creating a new
>>>> Future is non-issue. Under full throttle/throughput and not blocked
>> sources
>>>> returning a static `NOT_BLOCKED` constant  should also solve the
>> problem.
>>>>
>>>> One more remark, non-blocking sources might be a necessity in a single
>>>> threaded model without a checkpointing lock. (Currently when sources are
>>>> blocked, they can release checkpointing lock and re-acquire it again
>>>> later). Non-blocking `poll`/`getNext()` would allow for checkpoints to
>>>> happen when source is idling. In that case either `notify()` or my
>> proposed
>>>> `isBlocked()` would allow to avoid busy-looping.
>>>>
>>>> Piotrek
>>>>
>>>>> On 5 Nov 2018, at 03:59, Becket Qin <[hidden email]> wrote:
>>>>>
>>>>> Hi Thomas,
>>>>>
>>>>> The iterator-like API was also the first thing that came to me. But it
>>>>> seems a little confusing that hasNext() does not mean "the stream has
>> not
>>>>> ended", but means "the next record is ready", which is repurposing the
>>>> well
>>>>> known meaning of hasNext(). If we follow the hasNext()/next() pattern,
>> an
>>>>> additional isNextReady() method to indicate whether the next record is
>>>>> ready seems more intuitive to me.
>>>>>
>>>>> Similarly, in poll()/take() pattern, another method of isDone() is
>> needed
>>>>> to indicate whether the stream has ended or not.
>>>>>
>>>>> Compared with hasNext()/next()/isNextReady() pattern,
>>>>> isDone()/poll()/take() seems more flexible for the reader
>> implementation.
>>>>> When I am implementing a reader, I could have a couple of choices:
>>>>>
>>>>> - A thread-less reader that does not have any internal thread.
>>>>> - When poll() is called, the same calling thread will perform a bunch
>>>> of
>>>>>    IO asynchronously.
>>>>>    - When take() is called, the same calling thread will perform a
>>>> bunch
>>>>>    of IO and wait until the record is ready.
>>>>> - A reader with internal threads performing network IO and put records
>>>>> into a buffer.
>>>>>    - When poll() is called, the calling thread simply reads from the
>>>>>    buffer and return empty result immediately if there is no record.
>>>>>    - When take() is called, the calling thread reads from the buffer
>>>> and
>>>>>    block waiting if the buffer is empty.
>>>>>
>>>>> On the other hand, with the hasNext()/next()/isNextReady() API, it is
>>>> less
>>>>> intuitive for the reader developers to write the thread-less pattern.
>>>>> Although technically speaking one can still do the asynchronous IO to
>>>>> prepare the record in isNextReady(). But it is inexplicit and seems
>>>>> somewhat hacky.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> Jiangjie (Becket) Qin
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <[hidden email]> wrote:
>>>>>
>>>>>> Couple more points regarding discovery:
>>>>>>
>>>>>> The proposal mentions that discovery could be outside the execution
>>>> graph.
>>>>>> Today, discovered partitions/shards are checkpointed. I believe that
>>>> will
>>>>>> also need to be the case in the future, even when discovery and
>> reading
>>>> are
>>>>>> split between different tasks.
>>>>>>
>>>>>> For cases such as resharding of a Kinesis stream, the relationship
>>>> between
>>>>>> splits needs to be considered. Splits cannot be randomly distributed
>>>> over
>>>>>> readers in certain situations. An example was mentioned here:
>>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
>>>>>>
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[hidden email]> wrote:
>>>>>>
>>>>>>> Thanks for getting the ball rolling on this!
>>>>>>>
>>>>>>> Can the number of splits decrease? Yes, splits can be closed and go
>>>> away.
>>>>>>> An example would be a shard merge in Kinesis (2 existing shards will
>> be
>>>>>>> closed and replaced with a new shard).
>>>>>>>
>>>>>>> Regarding advance/poll/take: IMO the least restrictive approach would
>>>> be
>>>>>>> the thread-less IO model (pull based, non-blocking, caller retrieves
>>>> new
>>>>>>> records when available). The current Kinesis API requires the use of
>>>>>>> threads. But that can be internal to the split reader and does not
>> need
>>>>>> to
>>>>>>> be a source API concern. In fact, that's what we are working on right
>>>> now
>>>>>>> as improvement to the existing consumer: Each shard consumer thread
>>>> will
>>>>>>> push to a queue, the consumer main thread will poll the queue(s). It
>> is
>>>>>>> essentially a mapping from threaded IO to non-blocking.
>>>>>>>
>>>>>>> The proposed SplitReader interface would fit the thread-less IO
>> model.
>>>>>>> Similar to an iterator, we find out if there is a new element
>> (hasNext)
>>>>>> and
>>>>>>> if so, move to it (next()). Separate calls deliver the meta
>> information
>>>>>>> (timestamp, watermark). Perhaps advance call could offer a timeout
>>>>>> option,
>>>>>>> so that the caller does not end up in a busy wait. On the other
>> hand, a
>>>>>>> caller processing multiple splits may want to cycle through fast, to
>>>>>>> process elements of other splits as soon as they become available.
>> The
>>>>>> nice
>>>>>>> thing is that this "split merge" logic can now live in Flink and be
>>>>>>> optimized and shared between different sources.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]>
>> wrote:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>> Thanks Aljoscha for this FLIP.
>>>>>>>>
>>>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is
>> very
>>>>>>>> important. But in addition to `Future/poll`, there may be another
>> way
>>>> to
>>>>>>>> achieve this. I think it may be not very memory friendly if every
>>>>>> advance
>>>>>>>> call return a Future.
>>>>>>>>
>>>>>>>> public interface Listener {
>>>>>>>>   public void notify();
>>>>>>>> }
>>>>>>>>
>>>>>>>> public interface SplitReader() {
>>>>>>>>   /**
>>>>>>>>    * When there is no element temporarily, this will return false.
>>>>>>>>    * When elements is available again splitReader can call
>>>>>>>> listener.notify()
>>>>>>>>    * In addition the frame would check `advance` periodically .
>>>>>>>>    * Of course advance can always return true and ignore the
>>>> listener
>>>>>>>> argument for simplicity.
>>>>>>>>    */
>>>>>>>>   public boolean advance(Listener listener);
>>>>>>>> }
>>>>>>>>
>>>>>>>> 2.  The FLIP tells us very clearly that how to create all Splits and
>>>> how
>>>>>>>> to create a SplitReader from a Split. But there is no strategy for
>> the
>>>>>> user
>>>>>>>> to choose how to assign the splits to the tasks. I think we could
>> add
>>>> a
>>>>>>>> Enum to let user to choose.
>>>>>>>> /**
>>>>>>>> public Enum SplitsAssignmentPolicy {
>>>>>>>>  Location,
>>>>>>>>  Workload,
>>>>>>>>  Random,
>>>>>>>>  Average
>>>>>>>> }
>>>>>>>> */
>>>>>>>>
>>>>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
>>>> `getNext`
>>>>>>>> the `getNext` would need return a `ElementWithTimestamp` because
>> some
>>>>>>>> sources want to add timestamp to every element. IMO, this is not so
>>>>>> memory
>>>>>>>> friendly so I prefer this design.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks
>>>>>>>>
>>>>>>>> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
>>>>>>>>
>>>>>>>>> Hi,
>>>>>>>>>
>>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of
>> other
>>>>>>>>> possible improvements. I have one proposal. Instead of having a
>>>> method:
>>>>>>>>>
>>>>>>>>> boolean advance() throws IOException;
>>>>>>>>>
>>>>>>>>> I would replace it with
>>>>>>>>>
>>>>>>>>> /*
>>>>>>>>> * Return a future, which when completed means that source has more
>>>>>> data
>>>>>>>>> and getNext() will not block.
>>>>>>>>> * If you wish to use benefits of non blocking connectors, please
>>>>>>>>> implement this method appropriately.
>>>>>>>>> */
>>>>>>>>> default CompletableFuture<?> isBlocked() {
>>>>>>>>>      return CompletableFuture.completedFuture(null);
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> And rename `getCurrent()` to `getNext()`.
>>>>>>>>>
>>>>>>>>> Couple of arguments:
>>>>>>>>> 1. I don’t understand the division of work between `advance()` and
>>>>>>>>> `getCurrent()`. What should be done in which, especially for
>>>> connectors
>>>>>>>>> that handle records in batches (like Kafka) and when should you
>> call
>>>>>>>>> `advance` and when `getCurrent()`.
>>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us in
>>>> the
>>>>>>>>> future to have asynchronous/non blocking connectors and more
>>>>>> efficiently
>>>>>>>>> handle large number of blocked threads, without busy waiting. While
>>>> at
>>>>>> the
>>>>>>>>> same time it doesn’t add much complexity, since naive connector
>>>>>>>>> implementations can be always blocking.
>>>>>>>>> 3. This also would allow us to use a fixed size thread pool of task
>>>>>>>>> executors, instead of one thread per task.
>>>>>>>>>
>>>>>>>>> Piotrek
>>>>>>>>>
>>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]>
>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>> Hi All,
>>>>>>>>>>
>>>>>>>>>> In order to finally get the ball rolling on the new source
>> interface
>>>>>>>>> that we have discussed for so long I finally created a FLIP:
>>>>>>>>>
>>>>>>
>>>>
>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
>>>>>>>>>>
>>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
>>>> about
>>>>>>>>> adding per-partition watermark support to the Kinesis source and
>>>>>> because
>>>>>>>>> this would enable generic implementation of event-time alignment
>> for
>>>>>> all
>>>>>>>>> sources. Maybe we need another FLIP for the event-time alignment
>>>> part,
>>>>>>>>> especially the part about information sharing between operations
>> (I'm
>>>>>> not
>>>>>>>>> calling it state sharing because state has a special meaning in
>>>> Flink).
>>>>>>>>>>
>>>>>>>>>> Please discuss away!
>>>>>>>>>>
>>>>>>>>>> Aljoscha
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>
>>>>
>>>>
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-27: Refactor Source Interface

Becket Qin
Hi Piotrek,

Thanks for the explanation. We are probably talking about the same thing
but in different ways. To clarify a little bit, I think there are two
patterns to read from a connector.

Pattern 1: Thread-less connector with a blocking read API. Outside of the
connector, there is one IO thread per reader, doing blocking read. An
additional thread will interact with all the IO threads.
Pattern 2: Connector with internal thread(s) and non-blocking API. Outside
of the connector, there is one thread for ALL readers, doing IO relying on
notification callbacks in the reader.

In both patterns, there must be at least one thread per connector, either
inside (created by connector writers) or outside (created by Flink) of the
connector. Ideally there are NUM_CONNECTORS + 1 threads in total, to make
sure that 1 thread is fully non-blocking.

>Btw, I don’t know if you understand my point. Having only `poll()` and
`take()` is not enough for single threaded task. If our source interface
doesn’t provide `notify()` callback nor >`CompletableFuture<?>
isBlocked(),`, there is no way to implement single threaded task that both
reads the data from the source connector and can also react to system
events. Ok, non >blocking `poll()` would allow that, but with busy looping.

Completely agree that in pattern 2, having a callback is necessary for that
single thread outside of the connectors. And the connectors MUST have
internal threads. If we go that way, we should have something like "void
poll(Callback) / void advance(callback)". I am curious how would
CompletableFuture work here, though. If 10 readers returns 10 completable
futures, will there be 10 additional threads (so 20 threads in total)
blocking waiting on them? Or will there be a single thread busy loop
checking around?

WRT pattern 1, a single blocking take() API should just work. The good
thing is that a blocking read API is usually simpler to implement. An
additional non-blocking "T poll()" method here is indeed optional and could
be used in cases like Flink does not want the thread to block forever. They
can also be combined to have a "T poll(Timeout)", which is exactly what
KafkaConsumer did.

It sounds that you are proposing pattern 2 with something similar to NIO2
AsynchronousByteChannel[1]. That API would work, except that the signature
returning future seems not necessary. If that is the case, a minor change
on the current FLIP proposal to have "void advance(callback)" should work.
And this means the connectors MUST have their internal threads.

BTW, one thing I am also trying to avoid is pushing users to perform IO in
a method like "isBlocked()". If the method is expected to fetch records
(even if not returning them), naming it something more explicit would help
avoid confusion.

Thanks,

Jiangjie (Becket) Qin

[1]
https://docs.oracle.com/javase/8/docs/api/java/nio/channels/AsynchronousByteChannel.html

On Fri, Nov 9, 2018 at 11:20 PM Piotr Nowojski <[hidden email]>
wrote:

> Hi
>
> Good point with select/epoll, however I do not see how they couldn’t be
> with Flink if we would like single task in Flink to be single-threaded (and
> I believe we should pursue this goal). If your connector blocks on
> `select`, then it can not process/handle control messages from Flink, like
> checkpoints, releasing resources and potentially output flushes. This would
> require tight integration between connector and Flink’s main event
> loop/selects/etc.
>
> Looking at it from other perspective. Let’s assume that we have a
> connector implemented on top of `select`/`epoll`. In order to integrate it
> with Flink’s checkpointing/flushes/resource releasing it will have to be
> executed in separate thread one way or another. At least if our API will
> enforce/encourage non blocking implementations with some kind of
> notifications (`isBlocked()` or `notify()` callback), some connectors might
> skip one layer of wapping threads.
>
> Btw, I don’t know if you understand my point. Having only `poll()` and
> `take()` is not enough for single threaded task. If our source interface
> doesn’t provide `notify()` callback nor `CompletableFuture<?>
> isBlocked(),`, there is no way to implement single threaded task that both
> reads the data from the source connector and can also react to system
> events. Ok, non blocking `poll()` would allow that, but with busy looping.
>
> Piotrek
>
> > On 8 Nov 2018, at 06:56, Becket Qin <[hidden email]> wrote:
> >
> > Hi Piotrek,
> >
> >> But I don’t see a reason why we should expose both blocking `take()` and
> > non-blocking `poll()` methods to the Flink engine. Someone (Flink engine
> or
> > connector) would have to do the same busy
> >> looping anyway and I think it would be better to have a simpler
> connector
> > API (that would solve our problems) and force connectors to comply one
> way
> > or another.
> >
> > If we let the block happen inside the connector, the blocking does not
> have
> > to be a busy loop. For example, to do the block waiting efficiently, the
> > connector can use java NIO selector().select which relies on OS syscall
> > like epoll[1] instead of busy looping. But if Flink engine blocks outside
> > the connector, it pretty much has to do the busy loop. So if there is
> only
> > one API to get the element, a blocking getNextElement() makes more sense.
> > In any case, we should avoid ambiguity. It has to be crystal clear about
> > whether a method is expected to be blocking or non-blocking. Otherwise it
> > would be very difficult for Flink engine to do the right thing with the
> > connectors. At the first glance at getCurrent(), the expected behavior is
> > not quite clear.
> >
> > That said, I do agree that functionality wise, poll() and take() kind of
> > overlap. But they are actually not quite different from
> > isBlocked()/getNextElement(). Compared with isBlocked(), the only
> > difference is that poll() also returns the next record if it is
> available.
> > But I agree that the isBlocked() + getNextElement() is more flexible as
> > users can just check the record availability, but not fetch the next
> > element.
> >
> >> In case of thread-less readers with only non-blocking `queue.poll()` (is
> > that really a thing? I can not think about a real implementation that
> > enforces such constraints)
> > Right, it is pretty much a syntax sugar to allow user combine the
> > check-and-take into one method. It could be achieved with isBlocked() +
> > getNextElement().
> >
> > [1] http://man7.org/linux/man-pages/man7/epoll.7.html
> >
> > Thanks,
> >
> > Jiangjie (Becket) Qin
> >
> > On Wed, Nov 7, 2018 at 11:58 PM Piotr Nowojski <[hidden email]>
> > wrote:
> >
> >> Hi Becket,
> >>
> >> With my proposal, both of your examples would have to be solved by the
> >> connector and solution to both problems would be the same:
> >>
> >> Pretend that connector is never blocked (`isBlocked() { return
> >> NOT_BLOCKED; }`) and implement `getNextElement()` in blocking fashion
> (or
> >> semi blocking with return of control from time to time to allow for
> >> checkpointing, network flushing and other resource management things to
> >> happen in the same main thread). In other words, exactly how you would
> >> implement `take()` method or how the same source connector would be
> >> implemented NOW with current source interface. The difference with
> current
> >> interface would be only that main loop would be outside of the
> connector,
> >> and instead of periodically releasing checkpointing lock, periodically
> >> `return null;` or `return Optional.empty();` from `getNextElement()`.
> >>
> >> In case of thread-less readers with only non-blocking `queue.poll()` (is
> >> that really a thing? I can not think about a real implementation that
> >> enforces such constraints), we could provide a wrapper that hides the
> busy
> >> looping. The same applies how to solve forever blocking readers - we
> could
> >> provider another wrapper running the connector in separate thread.
> >>
> >> But I don’t see a reason why we should expose both blocking `take()` and
> >> non-blocking `poll()` methods to the Flink engine. Someone (Flink
> engine or
> >> connector) would have to do the same busy looping anyway and I think it
> >> would be better to have a simpler connector API (that would solve our
> >> problems) and force connectors to comply one way or another.
> >>
> >> Piotrek
> >>
> >>> On 7 Nov 2018, at 10:55, Becket Qin <[hidden email]> wrote:
> >>>
> >>> Hi Piotr,
> >>>
> >>> I might have misunderstood you proposal. But let me try to explain my
> >>> concern. I am thinking about the following case:
> >>> 1. a reader has the following two interfaces,
> >>>   boolean isBlocked()
> >>>   T getNextElement()
> >>> 2. the implementation of getNextElement() is non-blocking.
> >>> 3. The reader is thread-less, i.e. it does not have any internal
> thread.
> >>> For example, it might just delegate the getNextElement() to a
> >> queue.poll(),
> >>> and isBlocked() is just queue.isEmpty().
> >>>
> >>> How can Flink efficiently implement a blocking reading behavior with
> this
> >>> reader? Either a tight loop or a backoff interval is needed. Neither of
> >>> them is ideal.
> >>>
> >>> Now let's say in the reader mentioned above implements a blocking
> >>> getNextElement() method. Because there is no internal thread in the
> >> reader,
> >>> after isBlocked() returns false. Flink will still have to loop on
> >>> isBlocked() to check whether the next record is available. If the next
> >>> record reaches after 10 min, it is a tight loop for 10 min. You have
> >>> probably noticed that in this case, even isBlocked() returns a future,
> >> that
> >>> future() will not be completed if Flink does not call some method from
> >> the
> >>> reader, because the reader has no internal thread to complete that
> future
> >>> by itself.
> >>>
> >>> Due to the above reasons, a blocking take() API would allow Flink to
> have
> >>> an efficient way to read from a reader. There are many ways to wake up
> >> the
> >>> blocking thread when checkpointing is needed depending on the
> >>> implementation. But I think the poll()/take() API would also work in
> that
> >>> case.
> >>>
> >>> Thanks,
> >>>
> >>> Jiangjie (Becket) Qin
> >>>
> >>> On Wed, Nov 7, 2018 at 4:31 PM Piotr Nowojski <[hidden email]
> >
> >>> wrote:
> >>>
> >>>> Hi,
> >>>>
> >>>> a)
> >>>>
> >>>>> BTW, regarding the isBlock() method, I have a few more questions. 21,
> >> Is
> >>>> a method isReady() with boolean as a return value
> >>>>> equivalent? Personally I found it is a little bit confusing in what
> is
> >>>> supposed to be returned when the future is completed. 22. if
> >>>>> the implementation of isBlocked() is optional, how do the callers
> know
> >>>> whether the method is properly implemented or not?
> >>>>> Does not implemented mean it always return a completed future?
> >>>>
> >>>> `CompletableFuture<?> isBlocked()` is more or less an equivalent to
> >>>> `boolean hasNext()` which in case of “false” provides some kind of a
> >>>> listener/callback that notifies about presence of next element. There
> >> are
> >>>> some minor details, like `CompletableFuture<?>` has a minimal two
> state
> >>>> logic:
> >>>>
> >>>> 1. Future is completed - we have more data
> >>>> 2. Future not yet completed - we don’t have data now, but we might/we
> >> will
> >>>> have in the future
> >>>>
> >>>> While `boolean hasNext()` and `notify()` callback are a bit more
> >>>> complicated/dispersed and can lead/encourage `notify()` spam.
> >>>>
> >>>> b)
> >>>>
> >>>>> 3. If merge the `advance` and `getCurrent`  to one method like
> >> `getNext`
> >>>> the `getNext` would need return a
> >>>>> `ElementWithTimestamp` because some sources want to add timestamp to
> >>>> every element. IMO, this is not so memory friendly
> >>>>> so I prefer this design.
> >>>>
> >>>> Guowei I don’t quite understand this. Could you elaborate why having a
> >>>> separate `advance()` help?
> >>>>
> >>>> c)
> >>>>
> >>>> Regarding advance/poll/take. What’s the value of having two separate
> >>>> methods: poll and take? Which one of them should be called and which
> >>>> implemented? What’s the benefit of having those methods compared to
> >> having
> >>>> a one single method `getNextElement()` (or `pollElement() or whatever
> we
> >>>> name it) with following contract:
> >>>>
> >>>> CompletableFuture<?> isBlocked();
> >>>>
> >>>> /**
> >>>> Return next element - will be called only if `isBlocked()` is
> completed.
> >>>> Try to implement it in non blocking fashion, but if that’s impossible
> or
> >>>> you just don’t need the effort, you can block in this method.
> >>>> */
> >>>> T getNextElement();
> >>>>
> >>>> I mean, if the connector is implemented non-blockingly, Flink should
> use
> >>>> it that way. If it’s not, then `poll()` will `throw new
> >>>> NotImplementedException()`. Implementing both of them and providing
> >> both of
> >>>> them to Flink wouldn’t make a sense, thus why not merge them into a
> >> single
> >>>> method call that should preferably (but not necessarily need to) be
> >>>> non-blocking? It’s not like we are implementing general purpose
> `Queue`,
> >>>> which users might want to call either of `poll` or `take`. We would
> >> always
> >>>> prefer to call `poll`, but if it’s blocking, then still we have no
> >> choice,
> >>>> but to call it and block on it.
> >>>>
> >>>> d)
> >>>>
> >>>>> 1. I agree with Piotr and Becket that the non-blocking source is very
> >>>>> important. But in addition to `Future/poll`, there may be another way
> >> to
> >>>>> achieve this. I think it may be not very memory friendly if every
> >> advance
> >>>>> call return a Future.
> >>>>
> >>>> I didn’t want to mention this, to not clog my initial proposal, but
> >> there
> >>>> is a simple solution for the problem:
> >>>>
> >>>> public interface SplitReader {
> >>>>
> >>>>   (…)
> >>>>
> >>>>   CompletableFuture<?> NOT_BLOCKED =
> >>>> CompletableFuture.completedFuture(null);
> >>>>
> >>>>   /**
> >>>>    * Returns a future that will be completed when the page source
> >> becomes
> >>>>    * unblocked.  If the page source is not blocked, this method should
> >>>> return
> >>>>    * {@code NOT_BLOCKED}.
> >>>>    */
> >>>>   default CompletableFuture<?> isBlocked()
> >>>>   {
> >>>>       return NOT_BLOCKED;
> >>>>   }
> >>>>
> >>>> If we are blocked and we are waiting for the IO, then creating a new
> >>>> Future is non-issue. Under full throttle/throughput and not blocked
> >> sources
> >>>> returning a static `NOT_BLOCKED` constant  should also solve the
> >> problem.
> >>>>
> >>>> One more remark, non-blocking sources might be a necessity in a single
> >>>> threaded model without a checkpointing lock. (Currently when sources
> are
> >>>> blocked, they can release checkpointing lock and re-acquire it again
> >>>> later). Non-blocking `poll`/`getNext()` would allow for checkpoints to
> >>>> happen when source is idling. In that case either `notify()` or my
> >> proposed
> >>>> `isBlocked()` would allow to avoid busy-looping.
> >>>>
> >>>> Piotrek
> >>>>
> >>>>> On 5 Nov 2018, at 03:59, Becket Qin <[hidden email]> wrote:
> >>>>>
> >>>>> Hi Thomas,
> >>>>>
> >>>>> The iterator-like API was also the first thing that came to me. But
> it
> >>>>> seems a little confusing that hasNext() does not mean "the stream has
> >> not
> >>>>> ended", but means "the next record is ready", which is repurposing
> the
> >>>> well
> >>>>> known meaning of hasNext(). If we follow the hasNext()/next()
> pattern,
> >> an
> >>>>> additional isNextReady() method to indicate whether the next record
> is
> >>>>> ready seems more intuitive to me.
> >>>>>
> >>>>> Similarly, in poll()/take() pattern, another method of isDone() is
> >> needed
> >>>>> to indicate whether the stream has ended or not.
> >>>>>
> >>>>> Compared with hasNext()/next()/isNextReady() pattern,
> >>>>> isDone()/poll()/take() seems more flexible for the reader
> >> implementation.
> >>>>> When I am implementing a reader, I could have a couple of choices:
> >>>>>
> >>>>> - A thread-less reader that does not have any internal thread.
> >>>>> - When poll() is called, the same calling thread will perform a bunch
> >>>> of
> >>>>>    IO asynchronously.
> >>>>>    - When take() is called, the same calling thread will perform a
> >>>> bunch
> >>>>>    of IO and wait until the record is ready.
> >>>>> - A reader with internal threads performing network IO and put
> records
> >>>>> into a buffer.
> >>>>>    - When poll() is called, the calling thread simply reads from the
> >>>>>    buffer and return empty result immediately if there is no record.
> >>>>>    - When take() is called, the calling thread reads from the buffer
> >>>> and
> >>>>>    block waiting if the buffer is empty.
> >>>>>
> >>>>> On the other hand, with the hasNext()/next()/isNextReady() API, it is
> >>>> less
> >>>>> intuitive for the reader developers to write the thread-less pattern.
> >>>>> Although technically speaking one can still do the asynchronous IO to
> >>>>> prepare the record in isNextReady(). But it is inexplicit and seems
> >>>>> somewhat hacky.
> >>>>>
> >>>>> Thanks,
> >>>>>
> >>>>> Jiangjie (Becket) Qin
> >>>>>
> >>>>>
> >>>>>
> >>>>> On Mon, Nov 5, 2018 at 6:55 AM Thomas Weise <[hidden email]> wrote:
> >>>>>
> >>>>>> Couple more points regarding discovery:
> >>>>>>
> >>>>>> The proposal mentions that discovery could be outside the execution
> >>>> graph.
> >>>>>> Today, discovered partitions/shards are checkpointed. I believe that
> >>>> will
> >>>>>> also need to be the case in the future, even when discovery and
> >> reading
> >>>> are
> >>>>>> split between different tasks.
> >>>>>>
> >>>>>> For cases such as resharding of a Kinesis stream, the relationship
> >>>> between
> >>>>>> splits needs to be considered. Splits cannot be randomly distributed
> >>>> over
> >>>>>> readers in certain situations. An example was mentioned here:
> >>>>>> https://github.com/apache/flink/pull/6980#issuecomment-435202809
> >>>>>>
> >>>>>> Thomas
> >>>>>>
> >>>>>>
> >>>>>> On Sun, Nov 4, 2018 at 1:43 PM Thomas Weise <[hidden email]> wrote:
> >>>>>>
> >>>>>>> Thanks for getting the ball rolling on this!
> >>>>>>>
> >>>>>>> Can the number of splits decrease? Yes, splits can be closed and go
> >>>> away.
> >>>>>>> An example would be a shard merge in Kinesis (2 existing shards
> will
> >> be
> >>>>>>> closed and replaced with a new shard).
> >>>>>>>
> >>>>>>> Regarding advance/poll/take: IMO the least restrictive approach
> would
> >>>> be
> >>>>>>> the thread-less IO model (pull based, non-blocking, caller
> retrieves
> >>>> new
> >>>>>>> records when available). The current Kinesis API requires the use
> of
> >>>>>>> threads. But that can be internal to the split reader and does not
> >> need
> >>>>>> to
> >>>>>>> be a source API concern. In fact, that's what we are working on
> right
> >>>> now
> >>>>>>> as improvement to the existing consumer: Each shard consumer thread
> >>>> will
> >>>>>>> push to a queue, the consumer main thread will poll the queue(s).
> It
> >> is
> >>>>>>> essentially a mapping from threaded IO to non-blocking.
> >>>>>>>
> >>>>>>> The proposed SplitReader interface would fit the thread-less IO
> >> model.
> >>>>>>> Similar to an iterator, we find out if there is a new element
> >> (hasNext)
> >>>>>> and
> >>>>>>> if so, move to it (next()). Separate calls deliver the meta
> >> information
> >>>>>>> (timestamp, watermark). Perhaps advance call could offer a timeout
> >>>>>> option,
> >>>>>>> so that the caller does not end up in a busy wait. On the other
> >> hand, a
> >>>>>>> caller processing multiple splits may want to cycle through fast,
> to
> >>>>>>> process elements of other splits as soon as they become available.
> >> The
> >>>>>> nice
> >>>>>>> thing is that this "split merge" logic can now live in Flink and be
> >>>>>>> optimized and shared between different sources.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Thomas
> >>>>>>>
> >>>>>>>
> >>>>>>> On Sun, Nov 4, 2018 at 6:34 AM Guowei Ma <[hidden email]>
> >> wrote:
> >>>>>>>
> >>>>>>>> Hi,
> >>>>>>>> Thanks Aljoscha for this FLIP.
> >>>>>>>>
> >>>>>>>> 1. I agree with Piotr and Becket that the non-blocking source is
> >> very
> >>>>>>>> important. But in addition to `Future/poll`, there may be another
> >> way
> >>>> to
> >>>>>>>> achieve this. I think it may be not very memory friendly if every
> >>>>>> advance
> >>>>>>>> call return a Future.
> >>>>>>>>
> >>>>>>>> public interface Listener {
> >>>>>>>>   public void notify();
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>> public interface SplitReader() {
> >>>>>>>>   /**
> >>>>>>>>    * When there is no element temporarily, this will return false.
> >>>>>>>>    * When elements is available again splitReader can call
> >>>>>>>> listener.notify()
> >>>>>>>>    * In addition the frame would check `advance` periodically .
> >>>>>>>>    * Of course advance can always return true and ignore the
> >>>> listener
> >>>>>>>> argument for simplicity.
> >>>>>>>>    */
> >>>>>>>>   public boolean advance(Listener listener);
> >>>>>>>> }
> >>>>>>>>
> >>>>>>>> 2.  The FLIP tells us very clearly that how to create all Splits
> and
> >>>> how
> >>>>>>>> to create a SplitReader from a Split. But there is no strategy for
> >> the
> >>>>>> user
> >>>>>>>> to choose how to assign the splits to the tasks. I think we could
> >> add
> >>>> a
> >>>>>>>> Enum to let user to choose.
> >>>>>>>> /**
> >>>>>>>> public Enum SplitsAssignmentPolicy {
> >>>>>>>>  Location,
> >>>>>>>>  Workload,
> >>>>>>>>  Random,
> >>>>>>>>  Average
> >>>>>>>> }
> >>>>>>>> */
> >>>>>>>>
> >>>>>>>> 3. If merge the `advance` and `getCurrent`  to one method like
> >>>> `getNext`
> >>>>>>>> the `getNext` would need return a `ElementWithTimestamp` because
> >> some
> >>>>>>>> sources want to add timestamp to every element. IMO, this is not
> so
> >>>>>> memory
> >>>>>>>> friendly so I prefer this design.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thanks
> >>>>>>>>
> >>>>>>>> Piotr Nowojski <[hidden email]> 于2018年11月1日周四 下午6:08写道:
> >>>>>>>>
> >>>>>>>>> Hi,
> >>>>>>>>>
> >>>>>>>>> Thanks Aljoscha for starting this, it’s blocking quite a lot of
> >> other
> >>>>>>>>> possible improvements. I have one proposal. Instead of having a
> >>>> method:
> >>>>>>>>>
> >>>>>>>>> boolean advance() throws IOException;
> >>>>>>>>>
> >>>>>>>>> I would replace it with
> >>>>>>>>>
> >>>>>>>>> /*
> >>>>>>>>> * Return a future, which when completed means that source has
> more
> >>>>>> data
> >>>>>>>>> and getNext() will not block.
> >>>>>>>>> * If you wish to use benefits of non blocking connectors, please
> >>>>>>>>> implement this method appropriately.
> >>>>>>>>> */
> >>>>>>>>> default CompletableFuture<?> isBlocked() {
> >>>>>>>>>      return CompletableFuture.completedFuture(null);
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> And rename `getCurrent()` to `getNext()`.
> >>>>>>>>>
> >>>>>>>>> Couple of arguments:
> >>>>>>>>> 1. I don’t understand the division of work between `advance()`
> and
> >>>>>>>>> `getCurrent()`. What should be done in which, especially for
> >>>> connectors
> >>>>>>>>> that handle records in batches (like Kafka) and when should you
> >> call
> >>>>>>>>> `advance` and when `getCurrent()`.
> >>>>>>>>> 2. Replacing `boolean` with `CompletableFuture<?>` will allow us
> in
> >>>> the
> >>>>>>>>> future to have asynchronous/non blocking connectors and more
> >>>>>> efficiently
> >>>>>>>>> handle large number of blocked threads, without busy waiting.
> While
> >>>> at
> >>>>>> the
> >>>>>>>>> same time it doesn’t add much complexity, since naive connector
> >>>>>>>>> implementations can be always blocking.
> >>>>>>>>> 3. This also would allow us to use a fixed size thread pool of
> task
> >>>>>>>>> executors, instead of one thread per task.
> >>>>>>>>>
> >>>>>>>>> Piotrek
> >>>>>>>>>
> >>>>>>>>>> On 31 Oct 2018, at 17:22, Aljoscha Krettek <[hidden email]
> >
> >>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi All,
> >>>>>>>>>>
> >>>>>>>>>> In order to finally get the ball rolling on the new source
> >> interface
> >>>>>>>>> that we have discussed for so long I finally created a FLIP:
> >>>>>>>>>
> >>>>>>
> >>>>
> >>
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface
> >>>>>>>>>>
> >>>>>>>>>> I cc'ed Thomas and Jamie because of the ongoing work/discussion
> >>>> about
> >>>>>>>>> adding per-partition watermark support to the Kinesis source and
> >>>>>> because
> >>>>>>>>> this would enable generic implementation of event-time alignment
> >> for
> >>>>>> all
> >>>>>>>>> sources. Maybe we need another FLIP for the event-time alignment
> >>>> part,
> >>>>>>>>> especially the part about information sharing between operations
> >> (I'm
> >>>>>> not
> >>>>>>>>> calling it state sharing because state has a special meaning in
> >>>> Flink).
> >>>>>>>>>>
> >>>>>>>>>> Please discuss away!
> >>>>>>>>>>
> >>>>>>>>>> Aljoscha
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>
> >>>>
> >>>>
> >>
> >>
>
>
12345