Hi there,
I came across Flink and FlinkSQL and using FlinkSQL for stream processing. Flink runs as 3 node cluster with embedded Zookeeper, given heap 80GB on each. I came across few issues and would like to get some clarification. - Job1: Using Flink(java) to read and flatten my JSON and write to Kafka topic. - Job2: Environment file configured to read from 2 different Kafka topics. I get to join both the tables and are working. The query runs for a while (say an hour) and then fails with *error*. Questions: 1. How long does the data reside in my table once I read it? I consume 100GB per day, should have been a retention policy right? If so, where do I configure and how? 2. Are retention policies specific to tables? 3. I have a data set updates once a day. How about using UPSERT mode? If so, how could I delete the existing data set to load the new? *Query*: SELECT s.* from sourceKafka AS s INNER JOIN badIp AS b ON s.`source.ip`=b.ip; *Error*: org.apache.flink.util.FlinkException: The assigned slot e57d1c0556b4a197eb44d7d9e83e1a47_6 was removed. at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlot(SlotManagerImpl.java:958) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlots(SlotManagerImpl.java:928) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalUnregisterTaskManager(SlotManagerImpl.java:1149) *Environment File*: #============================================================================== # Tables #============================================================================== # Define tables here such as sources, sinks, views, or temporal tables. tables: # empty list # A typical table source definition looks like: - name: sourceKafka type: source-table update-mode: append connector: type: kafka version: "universal" # required: valid connector versions are # "0.8", "0.9", "0.10", "0.11", and "universal" topic: recon-data-flatten # required: topic name from which the table is read properties: # optional: connector specific properties - key: zookeeper.connect value: 1.2.4.1:2181 - key: bootstrap.servers value: 1.2.4.1:9092 - key: group.id value: reconDataGroup format: type: json fail-on-missing-field: false json-schema: > { type: 'object', properties: { 'source.ip': { type: 'string' }, 'source.port': { type: 'string' }, 'destination.ip': { type: 'string' }, 'destination.port': { type: 'string' } } } derive-schema: false schema: - name: 'source.ip' type: VARCHAR - name: 'source.port' type: VARCHAR - name: 'destination.ip' type: VARCHAR - name: 'destination.port' type: VARCHAR - name: badips type: source-table #update-mode: append connector: type: filesystem path: "/home/ipsum/levels/badips.csv" format: type: csv fields: - name: ip type: VARCHAR comment-prefix: "#" schema: - name: ip type: VARCHAR #============================================================================== # Execution properties #============================================================================== # Properties that change the fundamental execution behavior of a table program. execution: # select the implementation responsible for planning table programs # possible values are 'old' (used by default) or 'blink' planner: blink # 'batch' or 'streaming' execution type: streaming # allow 'event-time' or only 'processing-time' in sources time-characteristic: event-time # interval in ms for emitting periodic watermarks periodic-watermarks-interval: 200 # 'changelog' or 'table' presentation of results result-mode: table # maximum number of maintained rows in 'table' presentation of results max-table-result-rows: 1000000 # parallelism of the program parallelism: 3 # maximum parallelism max-parallelism: 128 # minimum idle state retention in ms min-idle-state-retention: 0 # maximum idle state retention in ms max-idle-state-retention: 0 # current catalog ('default_catalog' by default) # current-catalog: default_catalog # current database of the current catalog (default database of the catalog by default) #current-database: default_database # controls how table programs are restarted in case of a failures restart-strategy: # strategy type # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default) type: fallback #attempts: 3 #delay: 5000 #============================================================================== # Configuration options #============================================================================== # A full list of options and their default values can be found # on the dedicated "Configuration" page. configuration: table.optimizer.join-reorder-enabled: true table.exec.spill-compression.enabled: true table.exec.spill-compression.block-size: 128kb #============================================================================== # Deployment properties #============================================================================== # Properties that describe the cluster to which table programs are submitted to. deployment: # general cluster communication timeout in ms response-timeout: 5000 |
Free forum by Nabble | Edit this page |