Apache Pinot Sink

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Apache Pinot Sink

Poerschke, Mats
Hi all,

we want to contribute a sink connector for Apache Pinot. The following briefly describes the planned control flow. Please feel free to comment on any of its aspects.

Background
Apache Pinot is a large-scale real-time data ingestion engine working on data segments internally. The controller exposes an external API which allows posting new segments via REST call. A thereby posted segment must contain an id (called segment name).

Control Flow
The Flink sink will collect data tuples locally. When creating a checkpoint, all those tuples are grouped into one segment which is then pushed to the Pinot controller. We will assign each pushed segment a unique incrementing identifier.
After receiving a success response from the Pinot controller, the latest segment name is persisted within the Flink checkpoint.
In case we have to recover from a failure, the latest successfully pushed segment name can be reconstructed from the Flink checkpoint. At this point the system might be in an inconsistent state. The Pinot controller might have already stored a newer segment (which’s name was, due to the failure, not persisted in the flink checkpoint).
This inconsistency is resolved with the next successful checkpoint creation. The there pushed segment will get the same segment name assigned as the inconsistent segment. Thus, Pinot replaces the old with the new segment which prevents introducing duplicate entries.


Best regards
Mats Pörschke

Reply | Threaded
Open this post in threaded view
|

Re: Apache Pinot Sink

Poerschke, Mats
Just as a short addition: We plan to contribute the sink to Apache Bahir.

Best regards
Mats Pörschke

> On 5. Jan 2021, at 13:21, Poerschke, Mats <[hidden email]> wrote:
>
> Hi all,
>
> we want to contribute a sink connector for Apache Pinot. The following briefly describes the planned control flow. Please feel free to comment on any of its aspects.
>
> Background
> Apache Pinot is a large-scale real-time data ingestion engine working on data segments internally. The controller exposes an external API which allows posting new segments via REST call. A thereby posted segment must contain an id (called segment name).
>
> Control Flow
> The Flink sink will collect data tuples locally. When creating a checkpoint, all those tuples are grouped into one segment which is then pushed to the Pinot controller. We will assign each pushed segment a unique incrementing identifier.
> After receiving a success response from the Pinot controller, the latest segment name is persisted within the Flink checkpoint.
> In case we have to recover from a failure, the latest successfully pushed segment name can be reconstructed from the Flink checkpoint. At this point the system might be in an inconsistent state. The Pinot controller might have already stored a newer segment (which’s name was, due to the failure, not persisted in the flink checkpoint).
> This inconsistency is resolved with the next successful checkpoint creation. The there pushed segment will get the same segment name assigned as the inconsistent segment. Thus, Pinot replaces the old with the new segment which prevents introducing duplicate entries.
>
>
> Best regards
> Mats Pörschke
>

Reply | Threaded
Open this post in threaded view
|

Re: Apache Pinot Sink

Aljoscha Krettek-2
In reply to this post by Poerschke, Mats
It's great to see interest in this. Where you planning to use the new
Sink interface that we recently introduced? [1]

Best,
Aljoscha

[1] https://s.apache.org/FLIP-143

On 2021/01/05 12:21, Poerschke, Mats wrote:

>Hi all,
>
>we want to contribute a sink connector for Apache Pinot. The following briefly describes the planned control flow. Please feel free to comment on any of its aspects.
>
>Background
>Apache Pinot is a large-scale real-time data ingestion engine working on data segments internally. The controller exposes an external API which allows posting new segments via REST call. A thereby posted segment must contain an id (called segment name).
>
>Control Flow
>The Flink sink will collect data tuples locally. When creating a checkpoint, all those tuples are grouped into one segment which is then pushed to the Pinot controller. We will assign each pushed segment a unique incrementing identifier.
>After receiving a success response from the Pinot controller, the latest segment name is persisted within the Flink checkpoint.
>In case we have to recover from a failure, the latest successfully pushed segment name can be reconstructed from the Flink checkpoint. At this point the system might be in an inconsistent state. The Pinot controller might have already stored a newer segment (which’s name was, due to the failure, not persisted in the flink checkpoint).
>This inconsistency is resolved with the next successful checkpoint creation. The there pushed segment will get the same segment name assigned as the inconsistent segment. Thus, Pinot replaces the old with the new segment which prevents introducing duplicate entries.
>
>
>Best regards
>Mats Pörschke
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Pinot Sink

Poerschke, Mats
Yes, we will use the latest sink interface.

Best,
Mats

> On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]> wrote:
>
> It's great to see interest in this. Where you planning to use the new Sink interface that we recently introduced? [1]
>
> Best,
> Aljoscha
>
> [1] https://s.apache.org/FLIP-143
>
> On 2021/01/05 12:21, Poerschke, Mats wrote:
>> Hi all,
>>
>> we want to contribute a sink connector for Apache Pinot. The following briefly describes the planned control flow. Please feel free to comment on any of its aspects.
>>
>> Background
>> Apache Pinot is a large-scale real-time data ingestion engine working on data segments internally. The controller exposes an external API which allows posting new segments via REST call. A thereby posted segment must contain an id (called segment name).
>>
>> Control Flow
>> The Flink sink will collect data tuples locally. When creating a checkpoint, all those tuples are grouped into one segment which is then pushed to the Pinot controller. We will assign each pushed segment a unique incrementing identifier.
>> After receiving a success response from the Pinot controller, the latest segment name is persisted within the Flink checkpoint.
>> In case we have to recover from a failure, the latest successfully pushed segment name can be reconstructed from the Flink checkpoint. At this point the system might be in an inconsistent state. The Pinot controller might have already stored a newer segment (which’s name was, due to the failure, not persisted in the flink checkpoint).
>> This inconsistency is resolved with the next successful checkpoint creation. The there pushed segment will get the same segment name assigned as the inconsistent segment. Thus, Pinot replaces the old with the new segment which prevents introducing duplicate entries.
>>
>>
>> Best regards
>> Mats Pörschke
>>

Reply | Threaded
Open this post in threaded view
|

Re: Apache Pinot Sink

Aljoscha Krettek-2
That's good to hear. I wasn't sure because the explanation focused a lot
on checkpoints and the details of it while with the new Sink interface
implementers don't need to be concerned with those. And in fact, when
the Sink is used in BATCH execution mode there will be no checkpoints.

Other than that, the implementation sketch makes sense to me. I think to
make further assessments you will probably have to work on a
proof-of-concept.

Best,
Aljoscha

On 2021/01/06 11:18, Poerschke, Mats wrote:

>Yes, we will use the latest sink interface.
>
>Best,
>Mats
>
>> On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]> wrote:
>>
>> It's great to see interest in this. Where you planning to use the new Sink interface that we recently introduced? [1]
>>
>> Best,
>> Aljoscha
>>
>> [1] https://s.apache.org/FLIP-143
>>
>> On 2021/01/05 12:21, Poerschke, Mats wrote:
>>> Hi all,
>>>
>>> we want to contribute a sink connector for Apache Pinot. The following briefly describes the planned control flow. Please feel free to comment on any of its aspects.
>>>
>>> Background
>>> Apache Pinot is a large-scale real-time data ingestion engine working on data segments internally. The controller exposes an external API which allows posting new segments via REST call. A thereby posted segment must contain an id (called segment name).
>>>
>>> Control Flow
>>> The Flink sink will collect data tuples locally. When creating a checkpoint, all those tuples are grouped into one segment which is then pushed to the Pinot controller. We will assign each pushed segment a unique incrementing identifier.
>>> After receiving a success response from the Pinot controller, the latest segment name is persisted within the Flink checkpoint.
>>> In case we have to recover from a failure, the latest successfully pushed segment name can be reconstructed from the Flink checkpoint. At this point the system might be in an inconsistent state. The Pinot controller might have already stored a newer segment (which’s name was, due to the failure, not persisted in the flink checkpoint).
>>> This inconsistency is resolved with the next successful checkpoint creation. The there pushed segment will get the same segment name assigned as the inconsistent segment. Thus, Pinot replaces the old with the new segment which prevents introducing duplicate entries.
>>>
>>>
>>> Best regards
>>> Mats Pörschke
>>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Pinot Sink

Yupeng Fu
Hi Mats,

Glad to see this interest! We at Uber are also working on a Pinot sink (for
BATCH execution), with some help from the Pinot community on abstracting
Pinot interfaces for segment writes and catalog retrieval. Perhaps we can
collaborate on this proposal/POC.

Cheers,

Yupeng



On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <[hidden email]> wrote:

> That's good to hear. I wasn't sure because the explanation focused a lot
> on checkpoints and the details of it while with the new Sink interface
> implementers don't need to be concerned with those. And in fact, when
> the Sink is used in BATCH execution mode there will be no checkpoints.
>
> Other than that, the implementation sketch makes sense to me. I think to
> make further assessments you will probably have to work on a
> proof-of-concept.
>
> Best,
> Aljoscha
>
> On 2021/01/06 11:18, Poerschke, Mats wrote:
> >Yes, we will use the latest sink interface.
> >
> >Best,
> >Mats
> >
> >> On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]> wrote:
> >>
> >> It's great to see interest in this. Where you planning to use the new
> Sink interface that we recently introduced? [1]
> >>
> >> Best,
> >> Aljoscha
> >>
> >> [1] https://s.apache.org/FLIP-143
> >>
> >> On 2021/01/05 12:21, Poerschke, Mats wrote:
> >>> Hi all,
> >>>
> >>> we want to contribute a sink connector for Apache Pinot. The following
> briefly describes the planned control flow. Please feel free to comment on
> any of its aspects.
> >>>
> >>> Background
> >>> Apache Pinot is a large-scale real-time data ingestion engine working
> on data segments internally. The controller exposes an external API which
> allows posting new segments via REST call. A thereby posted segment must
> contain an id (called segment name).
> >>>
> >>> Control Flow
> >>> The Flink sink will collect data tuples locally. When creating a
> checkpoint, all those tuples are grouped into one segment which is then
> pushed to the Pinot controller. We will assign each pushed segment a unique
> incrementing identifier.
> >>> After receiving a success response from the Pinot controller, the
> latest segment name is persisted within the Flink checkpoint.
> >>> In case we have to recover from a failure, the latest successfully
> pushed segment name can be reconstructed from the Flink checkpoint. At this
> point the system might be in an inconsistent state. The Pinot controller
> might have already stored a newer segment (which’s name was, due to the
> failure, not persisted in the flink checkpoint).
> >>> This inconsistency is resolved with the next successful checkpoint
> creation. The there pushed segment will get the same segment name assigned
> as the inconsistent segment. Thus, Pinot replaces the old with the new
> segment which prevents introducing duplicate entries.
> >>>
> >>>
> >>> Best regards
> >>> Mats Pörschke
> >>>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Pinot Sink

Venkata Sanath Muppalla
+1 As Yupeng mentioned, we at Uber are also looking into the Pinot Sink. It
would be great to collaborate on this proposal.

Thanks,
Sanath

On Wed, Jan 6, 2021 at 9:23 AM Yupeng Fu <[hidden email]> wrote:

> Hi Mats,
>
> Glad to see this interest! We at Uber are also working on a Pinot sink (for
> BATCH execution), with some help from the Pinot community on abstracting
> Pinot interfaces for segment writes and catalog retrieval. Perhaps we can
> collaborate on this proposal/POC.
>
> Cheers,
>
> Yupeng
>
>
>
> On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <[hidden email]>
> wrote:
>
> > That's good to hear. I wasn't sure because the explanation focused a lot
> > on checkpoints and the details of it while with the new Sink interface
> > implementers don't need to be concerned with those. And in fact, when
> > the Sink is used in BATCH execution mode there will be no checkpoints.
> >
> > Other than that, the implementation sketch makes sense to me. I think to
> > make further assessments you will probably have to work on a
> > proof-of-concept.
> >
> > Best,
> > Aljoscha
> >
> > On 2021/01/06 11:18, Poerschke, Mats wrote:
> > >Yes, we will use the latest sink interface.
> > >
> > >Best,
> > >Mats
> > >
> > >> On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]>
> wrote:
> > >>
> > >> It's great to see interest in this. Where you planning to use the new
> > Sink interface that we recently introduced? [1]
> > >>
> > >> Best,
> > >> Aljoscha
> > >>
> > >> [1] https://s.apache.org/FLIP-143
> > >>
> > >> On 2021/01/05 12:21, Poerschke, Mats wrote:
> > >>> Hi all,
> > >>>
> > >>> we want to contribute a sink connector for Apache Pinot. The
> following
> > briefly describes the planned control flow. Please feel free to comment
> on
> > any of its aspects.
> > >>>
> > >>> Background
> > >>> Apache Pinot is a large-scale real-time data ingestion engine working
> > on data segments internally. The controller exposes an external API which
> > allows posting new segments via REST call. A thereby posted segment must
> > contain an id (called segment name).
> > >>>
> > >>> Control Flow
> > >>> The Flink sink will collect data tuples locally. When creating a
> > checkpoint, all those tuples are grouped into one segment which is then
> > pushed to the Pinot controller. We will assign each pushed segment a
> unique
> > incrementing identifier.
> > >>> After receiving a success response from the Pinot controller, the
> > latest segment name is persisted within the Flink checkpoint.
> > >>> In case we have to recover from a failure, the latest successfully
> > pushed segment name can be reconstructed from the Flink checkpoint. At
> this
> > point the system might be in an inconsistent state. The Pinot controller
> > might have already stored a newer segment (which’s name was, due to the
> > failure, not persisted in the flink checkpoint).
> > >>> This inconsistency is resolved with the next successful checkpoint
> > creation. The there pushed segment will get the same segment name
> assigned
> > as the inconsistent segment. Thus, Pinot replaces the old with the new
> > segment which prevents introducing duplicate entries.
> > >>>
> > >>>
> > >>> Best regards
> > >>> Mats Pörschke
> > >>>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Pinot Sink

Poerschke, Mats
In reply to this post by Yupeng Fu
Hi all,

We want to give you a short update on the Pinot Sink since we started implementing a PoC.
As described earlier, we aim to use batch-uploading of segments to Pinot in combination with caching elements in the Flink sink.

Our current implementation works like this:

Besides the pinot controller URI and the target table’s name, the sink allows users to define the max number of rows per segment.

The PinotWriter is responsible for collecting elements, building segments and uploading them to Pinot. It therefore retrieves the Schema and TableConfig via the Pinot Controller API using the provided tableName. Whenever the specified maximum number of rows is reached, it starts the segment creation on disk. This process is handled by the Pinot admin-tool. A segmentID is structured as follows: <table-name>-<subtask-id>-<incremental-counter>
Finally the PinotWriter pushes the created segment to the Pinot Controller which will then distribute it onto Pinot Servers.

The PinotCommitter only checkpoints the segment ID of the segment that was last written. It is possible that multiple segments were uploaded to Pinot between two checkpoints.

As for future plans, we want to prevent high memory pressure when collecting elements in the PinotWriter by directly writing elements to disk. The main question at this point is whether we can assume to have access to a disk temp directory.

For the checkpointing and failure recovery we also thought of an approach without having tried it yet. Upon recovery from a checkpoint, the latest segment ID that is stored in the checkpoint can be accessed by the PinotSink. The PinotWriter then compares the incremental counter value of the checkpointed segment ID with segments that already exist in Pinot for the same table and subtask ID. If segments with a higher counter value in their IDs are discovered, they are deleted to avoid duplicates. After that, processing can continue as described above. We think that this mode of recovery assumes that elements from upstream Flink tasks always arrive at the same subtask of the sink. Is this fair?

Best regards,
Jakob and Mats


On 6. Jan 2021, at 18:22, Yupeng Fu <[hidden email]<mailto:[hidden email]>> wrote:

Hi Mats,

Glad to see this interest! We at Uber are also working on a Pinot sink (for
BATCH execution), with some help from the Pinot community on abstracting
Pinot interfaces for segment writes and catalog retrieval. Perhaps we can
collaborate on this proposal/POC.

Cheers,

Yupeng



On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <[hidden email]<mailto:[hidden email]>> wrote:

That's good to hear. I wasn't sure because the explanation focused a lot
on checkpoints and the details of it while with the new Sink interface
implementers don't need to be concerned with those. And in fact, when
the Sink is used in BATCH execution mode there will be no checkpoints.

Other than that, the implementation sketch makes sense to me. I think to
make further assessments you will probably have to work on a
proof-of-concept.

Best,
Aljoscha

On 2021/01/06 11:18, Poerschke, Mats wrote:
Yes, we will use the latest sink interface.

Best,
Mats

On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]<mailto:[hidden email]>> wrote:

It's great to see interest in this. Where you planning to use the new
Sink interface that we recently introduced? [1]

Best,
Aljoscha

[1] https://s.apache.org/FLIP-143

On 2021/01/05 12:21, Poerschke, Mats wrote:
Hi all,

we want to contribute a sink connector for Apache Pinot. The following
briefly describes the planned control flow. Please feel free to comment on
any of its aspects.

Background
Apache Pinot is a large-scale real-time data ingestion engine working
on data segments internally. The controller exposes an external API which
allows posting new segments via REST call. A thereby posted segment must
contain an id (called segment name).

Control Flow
The Flink sink will collect data tuples locally. When creating a
checkpoint, all those tuples are grouped into one segment which is then
pushed to the Pinot controller. We will assign each pushed segment a unique
incrementing identifier.
After receiving a success response from the Pinot controller, the
latest segment name is persisted within the Flink checkpoint.
In case we have to recover from a failure, the latest successfully
pushed segment name can be reconstructed from the Flink checkpoint. At this
point the system might be in an inconsistent state. The Pinot controller
might have already stored a newer segment (which’s name was, due to the
failure, not persisted in the flink checkpoint).
This inconsistency is resolved with the next successful checkpoint
creation. The there pushed segment will get the same segment name assigned
as the inconsistent segment. Thus, Pinot replaces the old with the new
segment which prevents introducing duplicate entries.


Best regards
Mats Pörschke




Reply | Threaded
Open this post in threaded view
|

Re: Apache Pinot Sink

Till Rohrmann
Hi Mats and Jakob,

In the general case, I don't think that elements from upstream Flink tasks
always arrive at the same subtask of the sink. One problem is that user
computations can be non-deterministic. Moreover, a rebalance operation can
distribute the events of a task A among several downstream tasks B_1, B_2.
In this case, records are distributed in a round robin fashion. Therefore,
they depend on the arrival order at task A:

event_2, event_1 => A => event_1 => B_1
                                     => event_2 => B_2

event_1, event_2 => A => event_2 => B_1
                                     => event_1 => B_2

Since the order in which events from multiple producers arrive at the
consumer is not deterministic (e.g. due to network delays), you might see
different distributions.

However, I am not sure whether this is really a problem if every Pinot sink
makes sure that all segments which have been written after the last
checkpoint you have recovered from are deleted. You might just end up with
a different job result which is a member of the space of valid results.

One problem I see with eagerly writing segments to Pinot is that downstream
systems might already start consuming the results even though they might
still change because of a recovery. The way Flink solves this problem is to
only publish results once a checkpoint has been completed.

Cheers,
Till

On Mon, Jan 25, 2021 at 4:30 PM Poerschke, Mats <
[hidden email]> wrote:

> Hi all,
>
> We want to give you a short update on the Pinot Sink since we started
> implementing a PoC.
> As described earlier, we aim to use batch-uploading of segments to Pinot
> in combination with caching elements in the Flink sink.
>
> Our current implementation works like this:
>
> Besides the pinot controller URI and the target table’s name, the sink
> allows users to define the max number of rows per segment.
>
> The PinotWriter is responsible for collecting elements, building segments
> and uploading them to Pinot. It therefore retrieves the Schema and
> TableConfig via the Pinot Controller API using the provided tableName.
> Whenever the specified maximum number of rows is reached, it starts the
> segment creation on disk. This process is handled by the Pinot admin-tool.
> A segmentID is structured as follows:
> <table-name>-<subtask-id>-<incremental-counter>
> Finally the PinotWriter pushes the created segment to the Pinot Controller
> which will then distribute it onto Pinot Servers.
>
> The PinotCommitter only checkpoints the segment ID of the segment that was
> last written. It is possible that multiple segments were uploaded to Pinot
> between two checkpoints.
>
> As for future plans, we want to prevent high memory pressure when
> collecting elements in the PinotWriter by directly writing elements to
> disk. The main question at this point is whether we can assume to have
> access to a disk temp directory.
>
> For the checkpointing and failure recovery we also thought of an approach
> without having tried it yet. Upon recovery from a checkpoint, the latest
> segment ID that is stored in the checkpoint can be accessed by the
> PinotSink. The PinotWriter then compares the incremental counter value of
> the checkpointed segment ID with segments that already exist in Pinot for
> the same table and subtask ID. If segments with a higher counter value in
> their IDs are discovered, they are deleted to avoid duplicates. After that,
> processing can continue as described above. We think that this mode of
> recovery assumes that elements from upstream Flink tasks always arrive at
> the same subtask of the sink. Is this fair?
>
> Best regards,
> Jakob and Mats
>
>
> On 6. Jan 2021, at 18:22, Yupeng Fu <[hidden email]<mailto:
> [hidden email]>> wrote:
>
> Hi Mats,
>
> Glad to see this interest! We at Uber are also working on a Pinot sink (for
> BATCH execution), with some help from the Pinot community on abstracting
> Pinot interfaces for segment writes and catalog retrieval. Perhaps we can
> collaborate on this proposal/POC.
>
> Cheers,
>
> Yupeng
>
>
>
> On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <[hidden email]
> <mailto:[hidden email]>> wrote:
>
> That's good to hear. I wasn't sure because the explanation focused a lot
> on checkpoints and the details of it while with the new Sink interface
> implementers don't need to be concerned with those. And in fact, when
> the Sink is used in BATCH execution mode there will be no checkpoints.
>
> Other than that, the implementation sketch makes sense to me. I think to
> make further assessments you will probably have to work on a
> proof-of-concept.
>
> Best,
> Aljoscha
>
> On 2021/01/06 11:18, Poerschke, Mats wrote:
> Yes, we will use the latest sink interface.
>
> Best,
> Mats
>
> On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]<mailto:
> [hidden email]>> wrote:
>
> It's great to see interest in this. Where you planning to use the new
> Sink interface that we recently introduced? [1]
>
> Best,
> Aljoscha
>
> [1] https://s.apache.org/FLIP-143
>
> On 2021/01/05 12:21, Poerschke, Mats wrote:
> Hi all,
>
> we want to contribute a sink connector for Apache Pinot. The following
> briefly describes the planned control flow. Please feel free to comment on
> any of its aspects.
>
> Background
> Apache Pinot is a large-scale real-time data ingestion engine working
> on data segments internally. The controller exposes an external API which
> allows posting new segments via REST call. A thereby posted segment must
> contain an id (called segment name).
>
> Control Flow
> The Flink sink will collect data tuples locally. When creating a
> checkpoint, all those tuples are grouped into one segment which is then
> pushed to the Pinot controller. We will assign each pushed segment a unique
> incrementing identifier.
> After receiving a success response from the Pinot controller, the
> latest segment name is persisted within the Flink checkpoint.
> In case we have to recover from a failure, the latest successfully
> pushed segment name can be reconstructed from the Flink checkpoint. At this
> point the system might be in an inconsistent state. The Pinot controller
> might have already stored a newer segment (which’s name was, due to the
> failure, not persisted in the flink checkpoint).
> This inconsistency is resolved with the next successful checkpoint
> creation. The there pushed segment will get the same segment name assigned
> as the inconsistent segment. Thus, Pinot replaces the old with the new
> segment which prevents introducing duplicate entries.
>
>
> Best regards
> Mats Pörschke
>
>
>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Apache Pinot Sink

Yupeng Fu
Hi Mats and Jakob,

+1 to what Till said about non-deterministic behavior. Also I suggest you
look at only Pinot's offline segment creation from Flink.

Pinot provides an inbuilt lambda architecture and has the real-time and
offline segments per table (architecture diagram
<https://docs.pinot.apache.org/basics/concepts#pinot-components>).  The
rea-time server ingests the steams (e.g. Kafka) directly and buffers in
memory. So that the buffer can serve the query within seconds that the
events are produced to the stream. If we use Flink to buffer and flush to
Pinot upon reaching a threshold, then it defeats the purpose of real-time
serving and data freshness.

So it makes more sense to use Flink for the offline segment creation in
Pinot only. In a typical real-life production environment, Pinot offline
segments are generated periodically (e.g. daily) from a scheduled job. If
we limit the scope to this, then it's easier to solve the deterministic
issue that Till mentioned: the entire job can be relaunched, and overwrite
the previous batch of offline segments.

Also, there are existing conventions for the segment name, and I suggest
you use the SegmentNameGenerator
<https://github.com/apache/incubator-pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGenerator.java>
to
create the segment. In particular, It's important that the segment contains
the time range (usually the start/end of the day), so that you can
overwrite the entire batch of segments by identifying the range of the
offline table.

Lastly, a design doc will be helpful and I'm happy to contribute/review.

Best,

Yupeng



On Mon, Jan 25, 2021 at 9:37 AM Till Rohrmann <[hidden email]> wrote:

> Hi Mats and Jakob,
>
> In the general case, I don't think that elements from upstream Flink tasks
> always arrive at the same subtask of the sink. One problem is that user
> computations can be non-deterministic. Moreover, a rebalance operation can
> distribute the events of a task A among several downstream tasks B_1, B_2.
> In this case, records are distributed in a round robin fashion. Therefore,
> they depend on the arrival order at task A:
>
> event_2, event_1 => A => event_1 => B_1
>                                      => event_2 => B_2
>
> event_1, event_2 => A => event_2 => B_1
>                                      => event_1 => B_2
>
> Since the order in which events from multiple producers arrive at the
> consumer is not deterministic (e.g. due to network delays), you might see
> different distributions.
>
> However, I am not sure whether this is really a problem if every Pinot sink
> makes sure that all segments which have been written after the last
> checkpoint you have recovered from are deleted. You might just end up with
> a different job result which is a member of the space of valid results.
>
> One problem I see with eagerly writing segments to Pinot is that downstream
> systems might already start consuming the results even though they might
> still change because of a recovery. The way Flink solves this problem is to
> only publish results once a checkpoint has been completed.
>
> Cheers,
> Till
>
> On Mon, Jan 25, 2021 at 4:30 PM Poerschke, Mats <
> [hidden email]> wrote:
>
> > Hi all,
> >
> > We want to give you a short update on the Pinot Sink since we started
> > implementing a PoC.
> > As described earlier, we aim to use batch-uploading of segments to Pinot
> > in combination with caching elements in the Flink sink.
> >
> > Our current implementation works like this:
> >
> > Besides the pinot controller URI and the target table’s name, the sink
> > allows users to define the max number of rows per segment.
> >
> > The PinotWriter is responsible for collecting elements, building segments
> > and uploading them to Pinot. It therefore retrieves the Schema and
> > TableConfig via the Pinot Controller API using the provided tableName.
> > Whenever the specified maximum number of rows is reached, it starts the
> > segment creation on disk. This process is handled by the Pinot
> admin-tool.
> > A segmentID is structured as follows:
> > <table-name>-<subtask-id>-<incremental-counter>
> > Finally the PinotWriter pushes the created segment to the Pinot
> Controller
> > which will then distribute it onto Pinot Servers.
> >
> > The PinotCommitter only checkpoints the segment ID of the segment that
> was
> > last written. It is possible that multiple segments were uploaded to
> Pinot
> > between two checkpoints.
> >
> > As for future plans, we want to prevent high memory pressure when
> > collecting elements in the PinotWriter by directly writing elements to
> > disk. The main question at this point is whether we can assume to have
> > access to a disk temp directory.
> >
> > For the checkpointing and failure recovery we also thought of an approach
> > without having tried it yet. Upon recovery from a checkpoint, the latest
> > segment ID that is stored in the checkpoint can be accessed by the
> > PinotSink. The PinotWriter then compares the incremental counter value of
> > the checkpointed segment ID with segments that already exist in Pinot for
> > the same table and subtask ID. If segments with a higher counter value in
> > their IDs are discovered, they are deleted to avoid duplicates. After
> that,
> > processing can continue as described above. We think that this mode of
> > recovery assumes that elements from upstream Flink tasks always arrive at
> > the same subtask of the sink. Is this fair?
> >
> > Best regards,
> > Jakob and Mats
> >
> >
> > On 6. Jan 2021, at 18:22, Yupeng Fu <[hidden email]<mailto:
> > [hidden email]>> wrote:
> >
> > Hi Mats,
> >
> > Glad to see this interest! We at Uber are also working on a Pinot sink
> (for
> > BATCH execution), with some help from the Pinot community on abstracting
> > Pinot interfaces for segment writes and catalog retrieval. Perhaps we can
> > collaborate on this proposal/POC.
> >
> > Cheers,
> >
> > Yupeng
> >
> >
> >
> > On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <[hidden email]
> > <mailto:[hidden email]>> wrote:
> >
> > That's good to hear. I wasn't sure because the explanation focused a lot
> > on checkpoints and the details of it while with the new Sink interface
> > implementers don't need to be concerned with those. And in fact, when
> > the Sink is used in BATCH execution mode there will be no checkpoints.
> >
> > Other than that, the implementation sketch makes sense to me. I think to
> > make further assessments you will probably have to work on a
> > proof-of-concept.
> >
> > Best,
> > Aljoscha
> >
> > On 2021/01/06 11:18, Poerschke, Mats wrote:
> > Yes, we will use the latest sink interface.
> >
> > Best,
> > Mats
> >
> > On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]<mailto:
> > [hidden email]>> wrote:
> >
> > It's great to see interest in this. Where you planning to use the new
> > Sink interface that we recently introduced? [1]
> >
> > Best,
> > Aljoscha
> >
> > [1] https://s.apache.org/FLIP-143
> >
> > On 2021/01/05 12:21, Poerschke, Mats wrote:
> > Hi all,
> >
> > we want to contribute a sink connector for Apache Pinot. The following
> > briefly describes the planned control flow. Please feel free to comment
> on
> > any of its aspects.
> >
> > Background
> > Apache Pinot is a large-scale real-time data ingestion engine working
> > on data segments internally. The controller exposes an external API which
> > allows posting new segments via REST call. A thereby posted segment must
> > contain an id (called segment name).
> >
> > Control Flow
> > The Flink sink will collect data tuples locally. When creating a
> > checkpoint, all those tuples are grouped into one segment which is then
> > pushed to the Pinot controller. We will assign each pushed segment a
> unique
> > incrementing identifier.
> > After receiving a success response from the Pinot controller, the
> > latest segment name is persisted within the Flink checkpoint.
> > In case we have to recover from a failure, the latest successfully
> > pushed segment name can be reconstructed from the Flink checkpoint. At
> this
> > point the system might be in an inconsistent state. The Pinot controller
> > might have already stored a newer segment (which’s name was, due to the
> > failure, not persisted in the flink checkpoint).
> > This inconsistency is resolved with the next successful checkpoint
> > creation. The there pushed segment will get the same segment name
> assigned
> > as the inconsistent segment. Thus, Pinot replaces the old with the new
> > segment which prevents introducing duplicate entries.
> >
> >
> > Best regards
> > Mats Pörschke
> >
> >
> >
> >
> >
>