As per the docs, in Batch mode, dynamic memory allocation is avoided by storing messages being processed in ByteBuffers via Unsafe methods.
Couldn't find any docs describing mem mgmt in Streamingn mode. So... - Am wondering if this is also the case with Streaming ? - If so, how does Flink detect that an object is no longer being used and can be reclaimed for reuse once again ? -roshan |
In streaming, memory is mainly needed for state (key/value state). The
exact representation depends on the chosen StateBackend. State is explicitly released: For windows, state is cleaned up automatically (firing / expiry), for user-defined state, keys have to be explicitly cleared (clear() method) or in the future will have the option to expire. The heavy work horse for streaming state is currently RocksDB, which internally uses native (off-heap) memory to keep the data. Does that help? Stephan On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]> wrote: > As per the docs, in Batch mode, dynamic memory allocation is avoided by > storing messages being processed in ByteBuffers via Unsafe methods. > > Couldn't find any docs describing mem mgmt in Streamingn mode. So... > > - Am wondering if this is also the case with Streaming ? > > - If so, how does Flink detect that an object is no longer being used and > can be reclaimed for reuse once again ? > > -roshan > |
Hi Stephan,
Just wanted to jump into this discussion regarding state. So do you mean that if we maintain user-defined state (for non-window operators), then if we do not clear it explicitly will the data for that key remains in RocksDB. What happens in case of checkpoint ? I read in the documentation that after the checkpoint happens the rocksDB data is pushed to the desired location (hdfs or s3 or other fs), so for user-defined state does the data still remain in RocksDB after checkpoint ? Correct me if I have misunderstood this concept For one of our use we were going for this, but since I read the above part in documentation so we are going for Cassandra now (to store records and query them for a special case) Regards, Vinay Patil On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]> wrote: > In streaming, memory is mainly needed for state (key/value state). The > exact representation depends on the chosen StateBackend. > > State is explicitly released: For windows, state is cleaned up > automatically (firing / expiry), for user-defined state, keys have to be > explicitly cleared (clear() method) or in the future will have the option > to expire. > > The heavy work horse for streaming state is currently RocksDB, which > internally uses native (off-heap) memory to keep the data. > > Does that help? > > Stephan > > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]> > wrote: > > > As per the docs, in Batch mode, dynamic memory allocation is avoided by > > storing messages being processed in ByteBuffers via Unsafe methods. > > > > Couldn't find any docs describing mem mgmt in Streamingn mode. So... > > > > - Am wondering if this is also the case with Streaming ? > > > > - If so, how does Flink detect that an object is no longer being used and > > can be reclaimed for reuse once again ? > > > > -roshan > > > |
Hi Vinaj,
if you use user-defined state, you have to manually clear it. Otherwise, it will stay in the state backend (heap or RocksDB) until the job goes down (planned or due to an OOM error). This is esp. important to keep in mind, when using keyed state. If you have an unbounded, evolving key space you will likely run out-of-memory. The job will constantly add state for each new key but won't be able to clean up the state for "expired" keys. You could implement a clean-up mechanism this if you implement a custom stream operator. However this is a very low level interface and requires solid understanding of the internals like timestamps, watermarks and the checkpointing mechanism. The community is currently working on a state expiry feature (state will be discarded if not requested or updated for x minutes). Regarding the second question: Does state remain local after checkpointing? Yes, the local state is only copied to the remote FS (HDFS, S3, ...) but remains in the operator. So the state is not gone after a checkpoint is completed. Hope this helps, Fabian 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]>: > Hi Stephan, > > Just wanted to jump into this discussion regarding state. > > So do you mean that if we maintain user-defined state (for non-window > operators), then if we do not clear it explicitly will the data for that > key remains in RocksDB. > > What happens in case of checkpoint ? I read in the documentation that after > the checkpoint happens the rocksDB data is pushed to the desired location > (hdfs or s3 or other fs), so for user-defined state does the data still > remain in RocksDB after checkpoint ? > > Correct me if I have misunderstood this concept > > For one of our use we were going for this, but since I read the above part > in documentation so we are going for Cassandra now (to store records and > query them for a special case) > > > > > > Regards, > Vinay Patil > > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]> wrote: > > > In streaming, memory is mainly needed for state (key/value state). The > > exact representation depends on the chosen StateBackend. > > > > State is explicitly released: For windows, state is cleaned up > > automatically (firing / expiry), for user-defined state, keys have to be > > explicitly cleared (clear() method) or in the future will have the option > > to expire. > > > > The heavy work horse for streaming state is currently RocksDB, which > > internally uses native (off-heap) memory to keep the data. > > > > Does that help? > > > > Stephan > > > > > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]> > > wrote: > > > > > As per the docs, in Batch mode, dynamic memory allocation is avoided by > > > storing messages being processed in ByteBuffers via Unsafe methods. > > > > > > Couldn't find any docs describing mem mgmt in Streamingn mode. So... > > > > > > - Am wondering if this is also the case with Streaming ? > > > > > > - If so, how does Flink detect that an object is no longer being used > and > > > can be reclaimed for reuse once again ? > > > > > > -roshan > > > > > > |
If you use RocksDB, you will not run into OutOfMemory errors.
On Wed, Aug 31, 2016 at 6:34 PM, Fabian Hueske <[hidden email]> wrote: > Hi Vinaj, > > if you use user-defined state, you have to manually clear it. > Otherwise, it will stay in the state backend (heap or RocksDB) until the > job goes down (planned or due to an OOM error). > > This is esp. important to keep in mind, when using keyed state. > If you have an unbounded, evolving key space you will likely run > out-of-memory. > The job will constantly add state for each new key but won't be able to > clean up the state for "expired" keys. > > You could implement a clean-up mechanism this if you implement a custom > stream operator. > However this is a very low level interface and requires solid understanding > of the internals like timestamps, watermarks and the checkpointing > mechanism. > > The community is currently working on a state expiry feature (state will be > discarded if not requested or updated for x minutes). > > Regarding the second question: Does state remain local after checkpointing? > Yes, the local state is only copied to the remote FS (HDFS, S3, ...) but > remains in the operator. So the state is not gone after a checkpoint is > completed. > > Hope this helps, > Fabian > > 2016-08-31 18:17 GMT+02:00 Vinay Patil <[hidden email]>: > > > Hi Stephan, > > > > Just wanted to jump into this discussion regarding state. > > > > So do you mean that if we maintain user-defined state (for non-window > > operators), then if we do not clear it explicitly will the data for that > > key remains in RocksDB. > > > > What happens in case of checkpoint ? I read in the documentation that > after > > the checkpoint happens the rocksDB data is pushed to the desired location > > (hdfs or s3 or other fs), so for user-defined state does the data still > > remain in RocksDB after checkpoint ? > > > > Correct me if I have misunderstood this concept > > > > For one of our use we were going for this, but since I read the above > part > > in documentation so we are going for Cassandra now (to store records and > > query them for a special case) > > > > > > > > > > > > Regards, > > Vinay Patil > > > > On Wed, Aug 31, 2016 at 4:51 AM, Stephan Ewen <[hidden email]> wrote: > > > > > In streaming, memory is mainly needed for state (key/value state). The > > > exact representation depends on the chosen StateBackend. > > > > > > State is explicitly released: For windows, state is cleaned up > > > automatically (firing / expiry), for user-defined state, keys have to > be > > > explicitly cleared (clear() method) or in the future will have the > option > > > to expire. > > > > > > The heavy work horse for streaming state is currently RocksDB, which > > > internally uses native (off-heap) memory to keep the data. > > > > > > Does that help? > > > > > > Stephan > > > > > > > > > On Tue, Aug 30, 2016 at 11:52 PM, Roshan Naik <[hidden email]> > > > wrote: > > > > > > > As per the docs, in Batch mode, dynamic memory allocation is avoided > by > > > > storing messages being processed in ByteBuffers via Unsafe methods. > > > > > > > > Couldn't find any docs describing mem mgmt in Streamingn mode. So... > > > > > > > > - Am wondering if this is also the case with Streaming ? > > > > > > > > - If so, how does Flink detect that an object is no longer being used > > and > > > > can be reclaimed for reuse once again ? > > > > > > > > -roshan > > > > > > > > > > |
Free forum by Nabble | Edit this page |