Hi all,
we want to contribute a sink connector for Apache Pinot. The following briefly describes the planned control flow. Please feel free to comment on any of its aspects. Background Apache Pinot is a large-scale real-time data ingestion engine working on data segments internally. The controller exposes an external API which allows posting new segments via REST call. A thereby posted segment must contain an id (called segment name). Control Flow The Flink sink will collect data tuples locally. When creating a checkpoint, all those tuples are grouped into one segment which is then pushed to the Pinot controller. We will assign each pushed segment a unique incrementing identifier. After receiving a success response from the Pinot controller, the latest segment name is persisted within the Flink checkpoint. In case we have to recover from a failure, the latest successfully pushed segment name can be reconstructed from the Flink checkpoint. At this point the system might be in an inconsistent state. The Pinot controller might have already stored a newer segment (which’s name was, due to the failure, not persisted in the flink checkpoint). This inconsistency is resolved with the next successful checkpoint creation. The there pushed segment will get the same segment name assigned as the inconsistent segment. Thus, Pinot replaces the old with the new segment which prevents introducing duplicate entries. Best regards Mats Pörschke |
Just as a short addition: We plan to contribute the sink to Apache Bahir.
Best regards Mats Pörschke > On 5. Jan 2021, at 13:21, Poerschke, Mats <[hidden email]> wrote: > > Hi all, > > we want to contribute a sink connector for Apache Pinot. The following briefly describes the planned control flow. Please feel free to comment on any of its aspects. > > Background > Apache Pinot is a large-scale real-time data ingestion engine working on data segments internally. The controller exposes an external API which allows posting new segments via REST call. A thereby posted segment must contain an id (called segment name). > > Control Flow > The Flink sink will collect data tuples locally. When creating a checkpoint, all those tuples are grouped into one segment which is then pushed to the Pinot controller. We will assign each pushed segment a unique incrementing identifier. > After receiving a success response from the Pinot controller, the latest segment name is persisted within the Flink checkpoint. > In case we have to recover from a failure, the latest successfully pushed segment name can be reconstructed from the Flink checkpoint. At this point the system might be in an inconsistent state. The Pinot controller might have already stored a newer segment (which’s name was, due to the failure, not persisted in the flink checkpoint). > This inconsistency is resolved with the next successful checkpoint creation. The there pushed segment will get the same segment name assigned as the inconsistent segment. Thus, Pinot replaces the old with the new segment which prevents introducing duplicate entries. > > > Best regards > Mats Pörschke > |
In reply to this post by Poerschke, Mats
It's great to see interest in this. Where you planning to use the new
Sink interface that we recently introduced? [1] Best, Aljoscha [1] https://s.apache.org/FLIP-143 On 2021/01/05 12:21, Poerschke, Mats wrote: >Hi all, > >we want to contribute a sink connector for Apache Pinot. The following briefly describes the planned control flow. Please feel free to comment on any of its aspects. > >Background >Apache Pinot is a large-scale real-time data ingestion engine working on data segments internally. The controller exposes an external API which allows posting new segments via REST call. A thereby posted segment must contain an id (called segment name). > >Control Flow >The Flink sink will collect data tuples locally. When creating a checkpoint, all those tuples are grouped into one segment which is then pushed to the Pinot controller. We will assign each pushed segment a unique incrementing identifier. >After receiving a success response from the Pinot controller, the latest segment name is persisted within the Flink checkpoint. >In case we have to recover from a failure, the latest successfully pushed segment name can be reconstructed from the Flink checkpoint. At this point the system might be in an inconsistent state. The Pinot controller might have already stored a newer segment (which’s name was, due to the failure, not persisted in the flink checkpoint). >This inconsistency is resolved with the next successful checkpoint creation. The there pushed segment will get the same segment name assigned as the inconsistent segment. Thus, Pinot replaces the old with the new segment which prevents introducing duplicate entries. > > >Best regards >Mats Pörschke > |
Yes, we will use the latest sink interface.
Best, Mats > On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]> wrote: > > It's great to see interest in this. Where you planning to use the new Sink interface that we recently introduced? [1] > > Best, > Aljoscha > > [1] https://s.apache.org/FLIP-143 > > On 2021/01/05 12:21, Poerschke, Mats wrote: >> Hi all, >> >> we want to contribute a sink connector for Apache Pinot. The following briefly describes the planned control flow. Please feel free to comment on any of its aspects. >> >> Background >> Apache Pinot is a large-scale real-time data ingestion engine working on data segments internally. The controller exposes an external API which allows posting new segments via REST call. A thereby posted segment must contain an id (called segment name). >> >> Control Flow >> The Flink sink will collect data tuples locally. When creating a checkpoint, all those tuples are grouped into one segment which is then pushed to the Pinot controller. We will assign each pushed segment a unique incrementing identifier. >> After receiving a success response from the Pinot controller, the latest segment name is persisted within the Flink checkpoint. >> In case we have to recover from a failure, the latest successfully pushed segment name can be reconstructed from the Flink checkpoint. At this point the system might be in an inconsistent state. The Pinot controller might have already stored a newer segment (which’s name was, due to the failure, not persisted in the flink checkpoint). >> This inconsistency is resolved with the next successful checkpoint creation. The there pushed segment will get the same segment name assigned as the inconsistent segment. Thus, Pinot replaces the old with the new segment which prevents introducing duplicate entries. >> >> >> Best regards >> Mats Pörschke >> |
That's good to hear. I wasn't sure because the explanation focused a lot
on checkpoints and the details of it while with the new Sink interface implementers don't need to be concerned with those. And in fact, when the Sink is used in BATCH execution mode there will be no checkpoints. Other than that, the implementation sketch makes sense to me. I think to make further assessments you will probably have to work on a proof-of-concept. Best, Aljoscha On 2021/01/06 11:18, Poerschke, Mats wrote: >Yes, we will use the latest sink interface. > >Best, >Mats > >> On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]> wrote: >> >> It's great to see interest in this. Where you planning to use the new Sink interface that we recently introduced? [1] >> >> Best, >> Aljoscha >> >> [1] https://s.apache.org/FLIP-143 >> >> On 2021/01/05 12:21, Poerschke, Mats wrote: >>> Hi all, >>> >>> we want to contribute a sink connector for Apache Pinot. The following briefly describes the planned control flow. Please feel free to comment on any of its aspects. >>> >>> Background >>> Apache Pinot is a large-scale real-time data ingestion engine working on data segments internally. The controller exposes an external API which allows posting new segments via REST call. A thereby posted segment must contain an id (called segment name). >>> >>> Control Flow >>> The Flink sink will collect data tuples locally. When creating a checkpoint, all those tuples are grouped into one segment which is then pushed to the Pinot controller. We will assign each pushed segment a unique incrementing identifier. >>> After receiving a success response from the Pinot controller, the latest segment name is persisted within the Flink checkpoint. >>> In case we have to recover from a failure, the latest successfully pushed segment name can be reconstructed from the Flink checkpoint. At this point the system might be in an inconsistent state. The Pinot controller might have already stored a newer segment (which’s name was, due to the failure, not persisted in the flink checkpoint). >>> This inconsistency is resolved with the next successful checkpoint creation. The there pushed segment will get the same segment name assigned as the inconsistent segment. Thus, Pinot replaces the old with the new segment which prevents introducing duplicate entries. >>> >>> >>> Best regards >>> Mats Pörschke >>> > |
Hi Mats,
Glad to see this interest! We at Uber are also working on a Pinot sink (for BATCH execution), with some help from the Pinot community on abstracting Pinot interfaces for segment writes and catalog retrieval. Perhaps we can collaborate on this proposal/POC. Cheers, Yupeng On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <[hidden email]> wrote: > That's good to hear. I wasn't sure because the explanation focused a lot > on checkpoints and the details of it while with the new Sink interface > implementers don't need to be concerned with those. And in fact, when > the Sink is used in BATCH execution mode there will be no checkpoints. > > Other than that, the implementation sketch makes sense to me. I think to > make further assessments you will probably have to work on a > proof-of-concept. > > Best, > Aljoscha > > On 2021/01/06 11:18, Poerschke, Mats wrote: > >Yes, we will use the latest sink interface. > > > >Best, > >Mats > > > >> On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]> wrote: > >> > >> It's great to see interest in this. Where you planning to use the new > Sink interface that we recently introduced? [1] > >> > >> Best, > >> Aljoscha > >> > >> [1] https://s.apache.org/FLIP-143 > >> > >> On 2021/01/05 12:21, Poerschke, Mats wrote: > >>> Hi all, > >>> > >>> we want to contribute a sink connector for Apache Pinot. The following > briefly describes the planned control flow. Please feel free to comment on > any of its aspects. > >>> > >>> Background > >>> Apache Pinot is a large-scale real-time data ingestion engine working > on data segments internally. The controller exposes an external API which > allows posting new segments via REST call. A thereby posted segment must > contain an id (called segment name). > >>> > >>> Control Flow > >>> The Flink sink will collect data tuples locally. When creating a > checkpoint, all those tuples are grouped into one segment which is then > pushed to the Pinot controller. We will assign each pushed segment a unique > incrementing identifier. > >>> After receiving a success response from the Pinot controller, the > latest segment name is persisted within the Flink checkpoint. > >>> In case we have to recover from a failure, the latest successfully > pushed segment name can be reconstructed from the Flink checkpoint. At this > point the system might be in an inconsistent state. The Pinot controller > might have already stored a newer segment (which’s name was, due to the > failure, not persisted in the flink checkpoint). > >>> This inconsistency is resolved with the next successful checkpoint > creation. The there pushed segment will get the same segment name assigned > as the inconsistent segment. Thus, Pinot replaces the old with the new > segment which prevents introducing duplicate entries. > >>> > >>> > >>> Best regards > >>> Mats Pörschke > >>> > > > |
+1 As Yupeng mentioned, we at Uber are also looking into the Pinot Sink. It
would be great to collaborate on this proposal. Thanks, Sanath On Wed, Jan 6, 2021 at 9:23 AM Yupeng Fu <[hidden email]> wrote: > Hi Mats, > > Glad to see this interest! We at Uber are also working on a Pinot sink (for > BATCH execution), with some help from the Pinot community on abstracting > Pinot interfaces for segment writes and catalog retrieval. Perhaps we can > collaborate on this proposal/POC. > > Cheers, > > Yupeng > > > > On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <[hidden email]> > wrote: > > > That's good to hear. I wasn't sure because the explanation focused a lot > > on checkpoints and the details of it while with the new Sink interface > > implementers don't need to be concerned with those. And in fact, when > > the Sink is used in BATCH execution mode there will be no checkpoints. > > > > Other than that, the implementation sketch makes sense to me. I think to > > make further assessments you will probably have to work on a > > proof-of-concept. > > > > Best, > > Aljoscha > > > > On 2021/01/06 11:18, Poerschke, Mats wrote: > > >Yes, we will use the latest sink interface. > > > > > >Best, > > >Mats > > > > > >> On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]> > wrote: > > >> > > >> It's great to see interest in this. Where you planning to use the new > > Sink interface that we recently introduced? [1] > > >> > > >> Best, > > >> Aljoscha > > >> > > >> [1] https://s.apache.org/FLIP-143 > > >> > > >> On 2021/01/05 12:21, Poerschke, Mats wrote: > > >>> Hi all, > > >>> > > >>> we want to contribute a sink connector for Apache Pinot. The > following > > briefly describes the planned control flow. Please feel free to comment > on > > any of its aspects. > > >>> > > >>> Background > > >>> Apache Pinot is a large-scale real-time data ingestion engine working > > on data segments internally. The controller exposes an external API which > > allows posting new segments via REST call. A thereby posted segment must > > contain an id (called segment name). > > >>> > > >>> Control Flow > > >>> The Flink sink will collect data tuples locally. When creating a > > checkpoint, all those tuples are grouped into one segment which is then > > pushed to the Pinot controller. We will assign each pushed segment a > unique > > incrementing identifier. > > >>> After receiving a success response from the Pinot controller, the > > latest segment name is persisted within the Flink checkpoint. > > >>> In case we have to recover from a failure, the latest successfully > > pushed segment name can be reconstructed from the Flink checkpoint. At > this > > point the system might be in an inconsistent state. The Pinot controller > > might have already stored a newer segment (which’s name was, due to the > > failure, not persisted in the flink checkpoint). > > >>> This inconsistency is resolved with the next successful checkpoint > > creation. The there pushed segment will get the same segment name > assigned > > as the inconsistent segment. Thus, Pinot replaces the old with the new > > segment which prevents introducing duplicate entries. > > >>> > > >>> > > >>> Best regards > > >>> Mats Pörschke > > >>> > > > > > > |
In reply to this post by Yupeng Fu
Hi all,
We want to give you a short update on the Pinot Sink since we started implementing a PoC. As described earlier, we aim to use batch-uploading of segments to Pinot in combination with caching elements in the Flink sink. Our current implementation works like this: Besides the pinot controller URI and the target table’s name, the sink allows users to define the max number of rows per segment. The PinotWriter is responsible for collecting elements, building segments and uploading them to Pinot. It therefore retrieves the Schema and TableConfig via the Pinot Controller API using the provided tableName. Whenever the specified maximum number of rows is reached, it starts the segment creation on disk. This process is handled by the Pinot admin-tool. A segmentID is structured as follows: <table-name>-<subtask-id>-<incremental-counter> Finally the PinotWriter pushes the created segment to the Pinot Controller which will then distribute it onto Pinot Servers. The PinotCommitter only checkpoints the segment ID of the segment that was last written. It is possible that multiple segments were uploaded to Pinot between two checkpoints. As for future plans, we want to prevent high memory pressure when collecting elements in the PinotWriter by directly writing elements to disk. The main question at this point is whether we can assume to have access to a disk temp directory. For the checkpointing and failure recovery we also thought of an approach without having tried it yet. Upon recovery from a checkpoint, the latest segment ID that is stored in the checkpoint can be accessed by the PinotSink. The PinotWriter then compares the incremental counter value of the checkpointed segment ID with segments that already exist in Pinot for the same table and subtask ID. If segments with a higher counter value in their IDs are discovered, they are deleted to avoid duplicates. After that, processing can continue as described above. We think that this mode of recovery assumes that elements from upstream Flink tasks always arrive at the same subtask of the sink. Is this fair? Best regards, Jakob and Mats On 6. Jan 2021, at 18:22, Yupeng Fu <[hidden email]<mailto:[hidden email]>> wrote: Hi Mats, Glad to see this interest! We at Uber are also working on a Pinot sink (for BATCH execution), with some help from the Pinot community on abstracting Pinot interfaces for segment writes and catalog retrieval. Perhaps we can collaborate on this proposal/POC. Cheers, Yupeng On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <[hidden email]<mailto:[hidden email]>> wrote: That's good to hear. I wasn't sure because the explanation focused a lot on checkpoints and the details of it while with the new Sink interface implementers don't need to be concerned with those. And in fact, when the Sink is used in BATCH execution mode there will be no checkpoints. Other than that, the implementation sketch makes sense to me. I think to make further assessments you will probably have to work on a proof-of-concept. Best, Aljoscha On 2021/01/06 11:18, Poerschke, Mats wrote: Yes, we will use the latest sink interface. Best, Mats On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]<mailto:[hidden email]>> wrote: It's great to see interest in this. Where you planning to use the new Sink interface that we recently introduced? [1] Best, Aljoscha [1] https://s.apache.org/FLIP-143 On 2021/01/05 12:21, Poerschke, Mats wrote: Hi all, we want to contribute a sink connector for Apache Pinot. The following briefly describes the planned control flow. Please feel free to comment on any of its aspects. Background Apache Pinot is a large-scale real-time data ingestion engine working on data segments internally. The controller exposes an external API which allows posting new segments via REST call. A thereby posted segment must contain an id (called segment name). Control Flow The Flink sink will collect data tuples locally. When creating a checkpoint, all those tuples are grouped into one segment which is then pushed to the Pinot controller. We will assign each pushed segment a unique incrementing identifier. After receiving a success response from the Pinot controller, the latest segment name is persisted within the Flink checkpoint. In case we have to recover from a failure, the latest successfully pushed segment name can be reconstructed from the Flink checkpoint. At this point the system might be in an inconsistent state. The Pinot controller might have already stored a newer segment (which’s name was, due to the failure, not persisted in the flink checkpoint). This inconsistency is resolved with the next successful checkpoint creation. The there pushed segment will get the same segment name assigned as the inconsistent segment. Thus, Pinot replaces the old with the new segment which prevents introducing duplicate entries. Best regards Mats Pörschke |
Hi Mats and Jakob,
In the general case, I don't think that elements from upstream Flink tasks always arrive at the same subtask of the sink. One problem is that user computations can be non-deterministic. Moreover, a rebalance operation can distribute the events of a task A among several downstream tasks B_1, B_2. In this case, records are distributed in a round robin fashion. Therefore, they depend on the arrival order at task A: event_2, event_1 => A => event_1 => B_1 => event_2 => B_2 event_1, event_2 => A => event_2 => B_1 => event_1 => B_2 Since the order in which events from multiple producers arrive at the consumer is not deterministic (e.g. due to network delays), you might see different distributions. However, I am not sure whether this is really a problem if every Pinot sink makes sure that all segments which have been written after the last checkpoint you have recovered from are deleted. You might just end up with a different job result which is a member of the space of valid results. One problem I see with eagerly writing segments to Pinot is that downstream systems might already start consuming the results even though they might still change because of a recovery. The way Flink solves this problem is to only publish results once a checkpoint has been completed. Cheers, Till On Mon, Jan 25, 2021 at 4:30 PM Poerschke, Mats < [hidden email]> wrote: > Hi all, > > We want to give you a short update on the Pinot Sink since we started > implementing a PoC. > As described earlier, we aim to use batch-uploading of segments to Pinot > in combination with caching elements in the Flink sink. > > Our current implementation works like this: > > Besides the pinot controller URI and the target table’s name, the sink > allows users to define the max number of rows per segment. > > The PinotWriter is responsible for collecting elements, building segments > and uploading them to Pinot. It therefore retrieves the Schema and > TableConfig via the Pinot Controller API using the provided tableName. > Whenever the specified maximum number of rows is reached, it starts the > segment creation on disk. This process is handled by the Pinot admin-tool. > A segmentID is structured as follows: > <table-name>-<subtask-id>-<incremental-counter> > Finally the PinotWriter pushes the created segment to the Pinot Controller > which will then distribute it onto Pinot Servers. > > The PinotCommitter only checkpoints the segment ID of the segment that was > last written. It is possible that multiple segments were uploaded to Pinot > between two checkpoints. > > As for future plans, we want to prevent high memory pressure when > collecting elements in the PinotWriter by directly writing elements to > disk. The main question at this point is whether we can assume to have > access to a disk temp directory. > > For the checkpointing and failure recovery we also thought of an approach > without having tried it yet. Upon recovery from a checkpoint, the latest > segment ID that is stored in the checkpoint can be accessed by the > PinotSink. The PinotWriter then compares the incremental counter value of > the checkpointed segment ID with segments that already exist in Pinot for > the same table and subtask ID. If segments with a higher counter value in > their IDs are discovered, they are deleted to avoid duplicates. After that, > processing can continue as described above. We think that this mode of > recovery assumes that elements from upstream Flink tasks always arrive at > the same subtask of the sink. Is this fair? > > Best regards, > Jakob and Mats > > > On 6. Jan 2021, at 18:22, Yupeng Fu <[hidden email]<mailto: > [hidden email]>> wrote: > > Hi Mats, > > Glad to see this interest! We at Uber are also working on a Pinot sink (for > BATCH execution), with some help from the Pinot community on abstracting > Pinot interfaces for segment writes and catalog retrieval. Perhaps we can > collaborate on this proposal/POC. > > Cheers, > > Yupeng > > > > On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <[hidden email] > <mailto:[hidden email]>> wrote: > > That's good to hear. I wasn't sure because the explanation focused a lot > on checkpoints and the details of it while with the new Sink interface > implementers don't need to be concerned with those. And in fact, when > the Sink is used in BATCH execution mode there will be no checkpoints. > > Other than that, the implementation sketch makes sense to me. I think to > make further assessments you will probably have to work on a > proof-of-concept. > > Best, > Aljoscha > > On 2021/01/06 11:18, Poerschke, Mats wrote: > Yes, we will use the latest sink interface. > > Best, > Mats > > On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]<mailto: > [hidden email]>> wrote: > > It's great to see interest in this. Where you planning to use the new > Sink interface that we recently introduced? [1] > > Best, > Aljoscha > > [1] https://s.apache.org/FLIP-143 > > On 2021/01/05 12:21, Poerschke, Mats wrote: > Hi all, > > we want to contribute a sink connector for Apache Pinot. The following > briefly describes the planned control flow. Please feel free to comment on > any of its aspects. > > Background > Apache Pinot is a large-scale real-time data ingestion engine working > on data segments internally. The controller exposes an external API which > allows posting new segments via REST call. A thereby posted segment must > contain an id (called segment name). > > Control Flow > The Flink sink will collect data tuples locally. When creating a > checkpoint, all those tuples are grouped into one segment which is then > pushed to the Pinot controller. We will assign each pushed segment a unique > incrementing identifier. > After receiving a success response from the Pinot controller, the > latest segment name is persisted within the Flink checkpoint. > In case we have to recover from a failure, the latest successfully > pushed segment name can be reconstructed from the Flink checkpoint. At this > point the system might be in an inconsistent state. The Pinot controller > might have already stored a newer segment (which’s name was, due to the > failure, not persisted in the flink checkpoint). > This inconsistency is resolved with the next successful checkpoint > creation. The there pushed segment will get the same segment name assigned > as the inconsistent segment. Thus, Pinot replaces the old with the new > segment which prevents introducing duplicate entries. > > > Best regards > Mats Pörschke > > > > > |
Hi Mats and Jakob,
+1 to what Till said about non-deterministic behavior. Also I suggest you look at only Pinot's offline segment creation from Flink. Pinot provides an inbuilt lambda architecture and has the real-time and offline segments per table (architecture diagram <https://docs.pinot.apache.org/basics/concepts#pinot-components>). The rea-time server ingests the steams (e.g. Kafka) directly and buffers in memory. So that the buffer can serve the query within seconds that the events are produced to the stream. If we use Flink to buffer and flush to Pinot upon reaching a threshold, then it defeats the purpose of real-time serving and data freshness. So it makes more sense to use Flink for the offline segment creation in Pinot only. In a typical real-life production environment, Pinot offline segments are generated periodically (e.g. daily) from a scheduled job. If we limit the scope to this, then it's easier to solve the deterministic issue that Till mentioned: the entire job can be relaunched, and overwrite the previous batch of offline segments. Also, there are existing conventions for the segment name, and I suggest you use the SegmentNameGenerator <https://github.com/apache/incubator-pinot/blob/master/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGenerator.java> to create the segment. In particular, It's important that the segment contains the time range (usually the start/end of the day), so that you can overwrite the entire batch of segments by identifying the range of the offline table. Lastly, a design doc will be helpful and I'm happy to contribute/review. Best, Yupeng On Mon, Jan 25, 2021 at 9:37 AM Till Rohrmann <[hidden email]> wrote: > Hi Mats and Jakob, > > In the general case, I don't think that elements from upstream Flink tasks > always arrive at the same subtask of the sink. One problem is that user > computations can be non-deterministic. Moreover, a rebalance operation can > distribute the events of a task A among several downstream tasks B_1, B_2. > In this case, records are distributed in a round robin fashion. Therefore, > they depend on the arrival order at task A: > > event_2, event_1 => A => event_1 => B_1 > => event_2 => B_2 > > event_1, event_2 => A => event_2 => B_1 > => event_1 => B_2 > > Since the order in which events from multiple producers arrive at the > consumer is not deterministic (e.g. due to network delays), you might see > different distributions. > > However, I am not sure whether this is really a problem if every Pinot sink > makes sure that all segments which have been written after the last > checkpoint you have recovered from are deleted. You might just end up with > a different job result which is a member of the space of valid results. > > One problem I see with eagerly writing segments to Pinot is that downstream > systems might already start consuming the results even though they might > still change because of a recovery. The way Flink solves this problem is to > only publish results once a checkpoint has been completed. > > Cheers, > Till > > On Mon, Jan 25, 2021 at 4:30 PM Poerschke, Mats < > [hidden email]> wrote: > > > Hi all, > > > > We want to give you a short update on the Pinot Sink since we started > > implementing a PoC. > > As described earlier, we aim to use batch-uploading of segments to Pinot > > in combination with caching elements in the Flink sink. > > > > Our current implementation works like this: > > > > Besides the pinot controller URI and the target table’s name, the sink > > allows users to define the max number of rows per segment. > > > > The PinotWriter is responsible for collecting elements, building segments > > and uploading them to Pinot. It therefore retrieves the Schema and > > TableConfig via the Pinot Controller API using the provided tableName. > > Whenever the specified maximum number of rows is reached, it starts the > > segment creation on disk. This process is handled by the Pinot > admin-tool. > > A segmentID is structured as follows: > > <table-name>-<subtask-id>-<incremental-counter> > > Finally the PinotWriter pushes the created segment to the Pinot > Controller > > which will then distribute it onto Pinot Servers. > > > > The PinotCommitter only checkpoints the segment ID of the segment that > was > > last written. It is possible that multiple segments were uploaded to > Pinot > > between two checkpoints. > > > > As for future plans, we want to prevent high memory pressure when > > collecting elements in the PinotWriter by directly writing elements to > > disk. The main question at this point is whether we can assume to have > > access to a disk temp directory. > > > > For the checkpointing and failure recovery we also thought of an approach > > without having tried it yet. Upon recovery from a checkpoint, the latest > > segment ID that is stored in the checkpoint can be accessed by the > > PinotSink. The PinotWriter then compares the incremental counter value of > > the checkpointed segment ID with segments that already exist in Pinot for > > the same table and subtask ID. If segments with a higher counter value in > > their IDs are discovered, they are deleted to avoid duplicates. After > that, > > processing can continue as described above. We think that this mode of > > recovery assumes that elements from upstream Flink tasks always arrive at > > the same subtask of the sink. Is this fair? > > > > Best regards, > > Jakob and Mats > > > > > > On 6. Jan 2021, at 18:22, Yupeng Fu <[hidden email]<mailto: > > [hidden email]>> wrote: > > > > Hi Mats, > > > > Glad to see this interest! We at Uber are also working on a Pinot sink > (for > > BATCH execution), with some help from the Pinot community on abstracting > > Pinot interfaces for segment writes and catalog retrieval. Perhaps we can > > collaborate on this proposal/POC. > > > > Cheers, > > > > Yupeng > > > > > > > > On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <[hidden email] > > <mailto:[hidden email]>> wrote: > > > > That's good to hear. I wasn't sure because the explanation focused a lot > > on checkpoints and the details of it while with the new Sink interface > > implementers don't need to be concerned with those. And in fact, when > > the Sink is used in BATCH execution mode there will be no checkpoints. > > > > Other than that, the implementation sketch makes sense to me. I think to > > make further assessments you will probably have to work on a > > proof-of-concept. > > > > Best, > > Aljoscha > > > > On 2021/01/06 11:18, Poerschke, Mats wrote: > > Yes, we will use the latest sink interface. > > > > Best, > > Mats > > > > On 6. Jan 2021, at 11:05, Aljoscha Krettek <[hidden email]<mailto: > > [hidden email]>> wrote: > > > > It's great to see interest in this. Where you planning to use the new > > Sink interface that we recently introduced? [1] > > > > Best, > > Aljoscha > > > > [1] https://s.apache.org/FLIP-143 > > > > On 2021/01/05 12:21, Poerschke, Mats wrote: > > Hi all, > > > > we want to contribute a sink connector for Apache Pinot. The following > > briefly describes the planned control flow. Please feel free to comment > on > > any of its aspects. > > > > Background > > Apache Pinot is a large-scale real-time data ingestion engine working > > on data segments internally. The controller exposes an external API which > > allows posting new segments via REST call. A thereby posted segment must > > contain an id (called segment name). > > > > Control Flow > > The Flink sink will collect data tuples locally. When creating a > > checkpoint, all those tuples are grouped into one segment which is then > > pushed to the Pinot controller. We will assign each pushed segment a > unique > > incrementing identifier. > > After receiving a success response from the Pinot controller, the > > latest segment name is persisted within the Flink checkpoint. > > In case we have to recover from a failure, the latest successfully > > pushed segment name can be reconstructed from the Flink checkpoint. At > this > > point the system might be in an inconsistent state. The Pinot controller > > might have already stored a newer segment (which’s name was, due to the > > failure, not persisted in the flink checkpoint). > > This inconsistency is resolved with the next successful checkpoint > > creation. The there pushed segment will get the same segment name > assigned > > as the inconsistent segment. Thus, Pinot replaces the old with the new > > segment which prevents introducing duplicate entries. > > > > > > Best regards > > Mats Pörschke > > > > > > > > > > > |
Free forum by Nabble | Edit this page |