Does Flink has Retention policy?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Does Flink has Retention policy?

srikanth flink
 Hi there,

I came across Flink and FlinkSQL and using FlinkSQL for stream processing.
Flink runs as 3 node cluster with embedded Zookeeper, given heap 80GB on
each. I came across few issues and would like to get some clarification.

   - Job1: Using Flink(java) to read and flatten my JSON and write to Kafka
   topic.


   - Job2: Environment file configured to read from 2 different Kafka
   topics. I get to join both the tables and are working. The query runs for a
   while (say an hour) and then fails with *error*.

Questions:

   1. How long does the data reside in my table once I read it? I consume
   100GB per day, should have been a retention policy right? If so, where do I
   configure and how?
   2. Are retention policies specific to tables?
   3. I have a data set updates once a day. How about using UPSERT mode?
   If so, how could I delete the existing data set to load the new?


*Query*: SELECT s.* from sourceKafka AS s INNER JOIN badIp AS b ON
s.`source.ip`=b.ip;
*Error*: org.apache.flink.util.FlinkException: The assigned slot
e57d1c0556b4a197eb44d7d9e83e1a47_6 was removed. at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlot(SlotManagerImpl.java:958)
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlots(SlotManagerImpl.java:928)
at
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalUnregisterTaskManager(SlotManagerImpl.java:1149)

*Environment File*:
#==============================================================================
# Tables
#==============================================================================

# Define tables here such as sources, sinks, views, or temporal tables.
tables:  # empty list
# A typical table source definition looks like:
  - name: sourceKafka
    type: source-table
    update-mode: append
    connector:
      type: kafka
      version: "universal"     # required: valid connector versions are
                      #   "0.8", "0.9", "0.10", "0.11", and "universal"
      topic: recon-data-flatten          # required: topic name from which
the table is read

      properties:         # optional: connector specific properties
        - key: zookeeper.connect
          value: 1.2.4.1:2181
        - key: bootstrap.servers
          value: 1.2.4.1:9092
        - key: group.id
          value: reconDataGroup
    format:
      type: json
      fail-on-missing-field: false
      json-schema: >
        {
          type: 'object',
          properties: {
            'source.ip': {
              type: 'string'
            },
            'source.port': {
              type: 'string'
            },
            'destination.ip': {
              type: 'string'
            },
            'destination.port': {
              type: 'string'
            }
          }
        }
      derive-schema: false

    schema:
      - name: 'source.ip'
        type: VARCHAR
      - name: 'source.port'
        type: VARCHAR
      - name: 'destination.ip'
        type: VARCHAR
      - name: 'destination.port'
        type: VARCHAR

  - name: badips
    type: source-table
    #update-mode: append
    connector:
      type: filesystem
      path: "/home/ipsum/levels/badips.csv"
    format:
      type: csv
      fields:
        - name: ip
          type: VARCHAR
      comment-prefix: "#"
    schema:
      - name: ip
        type: VARCHAR

#==============================================================================
# Execution properties
#==============================================================================

# Properties that change the fundamental execution behavior of a table
program.

execution:
  # select the implementation responsible for planning table programs
  # possible values are 'old' (used by default) or 'blink'
  planner: blink
  # 'batch' or 'streaming' execution
  type: streaming
  # allow 'event-time' or only 'processing-time' in sources
  time-characteristic: event-time
  # interval in ms for emitting periodic watermarks
  periodic-watermarks-interval: 200
  # 'changelog' or 'table' presentation of results
  result-mode: table
  # maximum number of maintained rows in 'table' presentation of results
  max-table-result-rows: 1000000
  # parallelism of the program
  parallelism: 3
  # maximum parallelism
  max-parallelism: 128
  # minimum idle state retention in ms
  min-idle-state-retention: 0
  # maximum idle state retention in ms
  max-idle-state-retention: 0
  # current catalog ('default_catalog' by default)
  #  current-catalog: default_catalog
  # current database of the current catalog (default database of the
catalog by default)
  #current-database: default_database
  # controls how table programs are restarted in case of a failures
  restart-strategy:
    # strategy type
    # possible values are "fixed-delay", "failure-rate", "none", or
"fallback" (default)
    type: fallback
    #attempts: 3
    #delay: 5000

#==============================================================================
# Configuration options
#==============================================================================

# A full list of options and their default values can be found
# on the dedicated "Configuration" page.
configuration:
  table.optimizer.join-reorder-enabled: true
  table.exec.spill-compression.enabled: true
  table.exec.spill-compression.block-size: 128kb

#==============================================================================
# Deployment properties
#==============================================================================

# Properties that describe the cluster to which table programs are
submitted to.

deployment:
  # general cluster communication timeout in ms
  response-timeout: 5000