Retention policy | Memory management.

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Retention policy | Memory management.

srikanth flink
Hi there,

I came across Flink and FlinkSQL and using FlinkSQL for stream processing.
Flink runs as 3 node cluster with embedded Zookeeper, given heap 80GB on
each. I came across few issues and would like to get some clarification.

   - Job1: Using Flink(java) to read and flatten my JSON and write to Kafka
   topic.


   - Job2: Environment file configured to read from 2 different Kafka
   topics. I get to join both the tables and are working. The query runs for a
   while (say an hour) and then fails with *error*.

Questions:

   1. How long does the data reside in my table once I read it? I consume
   100GB per day, should have been a retention policy right? If so, where do I
   configure and how?
   2. Are retention policies specific to tables?
   3. I have a data set updates once a day. How about using UPSERT mode?
   If so, how could I delete the existing data set to load the new?


*Query*: SELECT s.* from sourceKafka AS s INNER JOIN badIp AS b ON
s.`source.ip`=b.ip;
*Error*: org.apache.flink.util.FlinkException: The assigned slot
e57d1c0556b4a197eb44d7d9e83e1a47_6 was removed. at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlot(SlotManagerImpl.java:958)
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlots(SlotManagerImpl.java:928)
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalUnregisterTaskManager(SlotManagerImpl.java:1149)

*Environment File*:
#==============================================================================
# Tables
#==============================================================================

# Define tables here such as sources, sinks, views, or temporal tables.
tables:  # empty list
# A typical table source definition looks like:
  - name: sourceKafka
    type: source-table
    update-mode: append
    connector:
      type: kafka
      version: "universal"     # required: valid connector versions are
                      #   "0.8", "0.9", "0.10", "0.11", and "universal"
      topic: recon-data-flatten          # required: topic name from which
the table is read

      properties:         # optional: connector specific properties
        - key: zookeeper.connect
          value: 1.2.4.1:2181
        - key: bootstrap.servers
          value: 1.2.4.1:9092
        - key: group.id
          value: reconDataGroup
    format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'source.ip': {
              type: 'string'
            },
            'source.port': {
              type: 'string'
            },
            'destination.ip': {
              type: 'string'
            },
            'destination.port': {
              type: 'string'
            }
          }
        }
      derive-schema: false

    schema:
      - name: 'source.ip'
        type: VARCHAR
      - name: 'source.port'
        type: VARCHAR
      - name: 'destination.ip'
        type: VARCHAR
      - name: 'destination.port'
        type: VARCHAR

  - name: badips
    type: source-table
    #update-mode: append
    connector:
      type: filesystem
      path: "/home/ipsum/levels/badips.csv"
    format:
      type: csv
      fields:
        - name: ip
          type: VARCHAR
      comment-prefix: "#"
    schema:
      - name: ip
        type: VARCHAR

#==============================================================================
# Execution properties
#==============================================================================

# Properties that change the fundamental execution behavior of a table
program.

execution:
  # select the implementation responsible for planning table programs
  # possible values are 'old' (used by default) or 'blink'
  planner: blink
  # 'batch' or 'streaming' execution
  type: streaming
  # allow 'event-time' or only 'processing-time' in sources
  time-characteristic: event-time
  # interval in ms for emitting periodic watermarks
  periodic-watermarks-interval: 200
  # 'changelog' or 'table' presentation of results
  result-mode: table
  # maximum number of maintained rows in 'table' presentation of results
  max-table-result-rows: 1000000
  # parallelism of the program
  parallelism: 3
  # maximum parallelism
  max-parallelism: 128
  # minimum idle state retention in ms
  min-idle-state-retention: 0
  # maximum idle state retention in ms
  max-idle-state-retention: 0
  # current catalog ('default_catalog' by default)
  #  current-catalog: default_catalog
  # current database of the current catalog (default database of the
catalog by default)
  #current-database: default_database
  # controls how table programs are restarted in case of a failures
  restart-strategy:
    # strategy type
    # possible values are "fixed-delay", "failure-rate", "none", or
"fallback" (default)
    type: fallback
    #attempts: 3
    #delay: 5000

#==============================================================================
# Configuration options
#==============================================================================

# A full list of options and their default values can be found
# on the dedicated "Configuration" page.
configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb

#==============================================================================
# Deployment properties
#==============================================================================

# Properties that describe the cluster to which table programs are
submitted to.

deployment:
  # general cluster communication timeout in ms
  response-timeout: 5000
Reply | Threaded
Open this post in threaded view
|

Re: Retention policy | Memory management.

Jark Wu-2
Hi,

The Job1 is a simple ETL job and doesn’t consume much state size (only Kafka offset), so it should work well.
The Job2 is an unbounded join which will put the two input stream data into state in Join operator.
As the input stream is unlimited and 100GB per day as you described, if you are using Memory statebackend (which is the default one).
Then the job will OOM at the end.

Here are my answers:
>  1. How long does the data reside in my table once I read it? I consume
  100GB per day, should have been a retention policy right? If so, where do I
  configure and how?

The data is stored in state. You can specify the retention policy by setting
 “execution: min-idle-state-retention” and execution: max-idle-retention: “ keys[1]
 in environment file if you are using SQL CLI.

>  2. Are retention policies specific to tables?

No. It affects to all the stateble non-window operations (e.g. GroupBy, Join)

>   3. I have a data set updates once a day. How about using UPSERT mode?
  If so, how could I delete the existing data set to load the new?

Flink SQL doesn’t support to load periodic-changed data set yet. Maybe you can
achieve this by implementing custom source and operators in DataStream API.

Best,
Jark


[1]: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#environment-files


> 在 2019年9月13日,15:43,srikanth flink <[hidden email]> 写道:
>
> Hi there,
>
> I came across Flink and FlinkSQL and using FlinkSQL for stream processing.
> Flink runs as 3 node cluster with embedded Zookeeper, given heap 80GB on
> each. I came across few issues and would like to get some clarification.
>
>   - Job1: Using Flink(java) to read and flatten my JSON and write to Kafka
>   topic.
>
>
>   - Job2: Environment file configured to read from 2 different Kafka
>   topics. I get to join both the tables and are working. The query runs for a
>   while (say an hour) and then fails with *error*.
>
> Questions:
>
>   1. How long does the data reside in my table once I read it? I consume
>   100GB per day, should have been a retention policy right? If so, where do I
>   configure and how?
>   2. Are retention policies specific to tables?
>   3. I have a data set updates once a day. How about using UPSERT mode?
>   If so, how could I delete the existing data set to load the new?
>
>
> *Query*: SELECT s.* from sourceKafka AS s INNER JOIN badIp AS b ON
> s.`source.ip`=b.ip;
> *Error*: org.apache.flink.util.FlinkException: The assigned slot
> e57d1c0556b4a197eb44d7d9e83e1a47_6 was removed. at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlot(SlotManagerImpl.java:958)
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlots(SlotManagerImpl.java:928)
> at
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalUnregisterTaskManager(SlotManagerImpl.java:1149)
>
> *Environment File*:
> #==============================================================================
> # Tables
> #==============================================================================
>
> # Define tables here such as sources, sinks, views, or temporal tables.
> tables:  # empty list
> # A typical table source definition looks like:
>  - name: sourceKafka
>    type: source-table
>    update-mode: append
>    connector:
>      type: kafka
>      version: "universal"     # required: valid connector versions are
>                      #   "0.8", "0.9", "0.10", "0.11", and "universal"
>      topic: recon-data-flatten          # required: topic name from which
> the table is read
>
>      properties:         # optional: connector specific properties
>        - key: zookeeper.connect
>          value: 1.2.4.1:2181
>        - key: bootstrap.servers
>          value: 1.2.4.1:9092
>        - key: group.id
>          value: reconDataGroup
>    format:
>      type: json
>      fail-on-missing-field: false
>      json-schema: >
>        {
>          type: 'object',
>          properties: {
>            'source.ip': {
>              type: 'string'
>            },
>            'source.port': {
>              type: 'string'
>            },
>            'destination.ip': {
>              type: 'string'
>            },
>            'destination.port': {
>              type: 'string'
>            }
>          }
>        }
>      derive-schema: false
>
>    schema:
>      - name: 'source.ip'
>        type: VARCHAR
>      - name: 'source.port'
>        type: VARCHAR
>      - name: 'destination.ip'
>        type: VARCHAR
>      - name: 'destination.port'
>        type: VARCHAR
>
>  - name: badips
>    type: source-table
>    #update-mode: append
>    connector:
>      type: filesystem
>      path: "/home/ipsum/levels/badips.csv"
>    format:
>      type: csv
>      fields:
>        - name: ip
>          type: VARCHAR
>      comment-prefix: "#"
>    schema:
>      - name: ip
>        type: VARCHAR
>
> #==============================================================================
> # Execution properties
> #==============================================================================
>
> # Properties that change the fundamental execution behavior of a table
> program.
>
> execution:
>  # select the implementation responsible for planning table programs
>  # possible values are 'old' (used by default) or 'blink'
>  planner: blink
>  # 'batch' or 'streaming' execution
>  type: streaming
>  # allow 'event-time' or only 'processing-time' in sources
>  time-characteristic: event-time
>  # interval in ms for emitting periodic watermarks
>  periodic-watermarks-interval: 200
>  # 'changelog' or 'table' presentation of results
>  result-mode: table
>  # maximum number of maintained rows in 'table' presentation of results
>  max-table-result-rows: 1000000
>  # parallelism of the program
>  parallelism: 3
>  # maximum parallelism
>  max-parallelism: 128
>  # minimum idle state retention in ms
>  min-idle-state-retention: 0
>  # maximum idle state retention in ms
>  max-idle-state-retention: 0
>  # current catalog ('default_catalog' by default)
>  #  current-catalog: default_catalog
>  # current database of the current catalog (default database of the
> catalog by default)
>  #current-database: default_database
>  # controls how table programs are restarted in case of a failures
>  restart-strategy:
>    # strategy type
>    # possible values are "fixed-delay", "failure-rate", "none", or
> "fallback" (default)
>    type: fallback
>    #attempts: 3
>    #delay: 5000
>
> #==============================================================================
> # Configuration options
> #==============================================================================
>
> # A full list of options and their default values can be found
> # on the dedicated "Configuration" page.
> configuration:
>  table.optimizer.join-reorder-enabled: true
>  table.exec.spill-compression.enabled: true
>  table.exec.spill-compression.block-size: 128kb
>
> #==============================================================================
> # Deployment properties
> #==============================================================================
>
> # Properties that describe the cluster to which table programs are
> submitted to.
>
> deployment:
>  # general cluster communication timeout in ms
>  response-timeout: 5000