Sharing operator subtask state using side outputs

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Sharing operator subtask state using side outputs

vishalovercome
This post was updated on .
I am implementing a streaming application and one of the stateful operators
is trying to capture a “owner has items” relationship. The state, keyed per
owner consists of details about the owner and each of the items. Ownership
of an item can change and I would like to be able associate each item to its
correct owner. Since operator state for different owners could be in
different subtasks and these subtasks are intended to operate independently,
I want to know what’s the best way to share state. One solution that I was
able to think of was to create a keyed datastream from the side output of a
subtask and have it sent to the correct owner and clear the state from the
original owner. Essentially:

1. Subtask1 with state about OldOwner that has Item1, Item2, … , ItemN
2. Subtask1 writes to a message to side output (OldOwner, NewOwner,
List[ItemsToTransfer])
3. (Optional) Clear state about List[ItemsToTransfer] from state about
OldOwner.
4. Create a datastream from the side output and send it back to the same
operator, but potentially different subtask that has state about NewOwner.
5. Update the state of NewOwner by adding the new set of items

Since side outputs are intended for a very different purpose (logging,
etc.), I want to know whether this will work. Do the same fault tolerance
guarantees apply to side outputs as to regular data streams? Is there a
limit to how many side output messages can be buffered in a subtask?

An alternative approach might be to take the output of the first subtask and
feed it back to the same operator. Both these approaches, in theory violate
the property that a flink job is a DAG, although for my use case, there
would never be a cyclic data transfer.



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/