Hi there,
I came across Flink and FlinkSQL and using FlinkSQL for stream processing. Flink runs as 3 node cluster with embedded Zookeeper, given heap 80GB on each. I came across few issues and would like to get some clarification. - Job1: Using Flink(java) to read and flatten my JSON and write to Kafka topic. - Job2: Environment file configured to read from 2 different Kafka topics. I get to join both the tables and are working. The query runs for a while (say an hour) and then fails with *error*. Questions: 1. How long does the data reside in my table once I read it? I consume 100GB per day, should have been a retention policy right? If so, where do I configure and how? 2. Are retention policies specific to tables? 3. I have a data set updates once a day. How about using UPSERT mode? If so, how could I delete the existing data set to load the new? *Query*: SELECT s.* from sourceKafka AS s INNER JOIN badIp AS b ON s.`source.ip`=b.ip; *Error*: org.apache.flink.util.FlinkException: The assigned slot e57d1c0556b4a197eb44d7d9e83e1a47_6 was removed. at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlot(SlotManagerImpl.java:958) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlots(SlotManagerImpl.java:928) at org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalUnregisterTaskManager(SlotManagerImpl.java:1149) *Environment File*: #============================================================================== # Tables #============================================================================== # Define tables here such as sources, sinks, views, or temporal tables. tables: # empty list # A typical table source definition looks like: - name: sourceKafka type: source-table update-mode: append connector: type: kafka version: "universal" # required: valid connector versions are # "0.8", "0.9", "0.10", "0.11", and "universal" topic: recon-data-flatten # required: topic name from which the table is read properties: # optional: connector specific properties - key: zookeeper.connect value: 1.2.4.1:2181 - key: bootstrap.servers value: 1.2.4.1:9092 - key: group.id value: reconDataGroup format: type: json fail-on-missing-field: false json-schema: > { type: 'object', properties: { 'source.ip': { type: 'string' }, 'source.port': { type: 'string' }, 'destination.ip': { type: 'string' }, 'destination.port': { type: 'string' } } } derive-schema: false schema: - name: 'source.ip' type: VARCHAR - name: 'source.port' type: VARCHAR - name: 'destination.ip' type: VARCHAR - name: 'destination.port' type: VARCHAR - name: badips type: source-table #update-mode: append connector: type: filesystem path: "/home/ipsum/levels/badips.csv" format: type: csv fields: - name: ip type: VARCHAR comment-prefix: "#" schema: - name: ip type: VARCHAR #============================================================================== # Execution properties #============================================================================== # Properties that change the fundamental execution behavior of a table program. execution: # select the implementation responsible for planning table programs # possible values are 'old' (used by default) or 'blink' planner: blink # 'batch' or 'streaming' execution type: streaming # allow 'event-time' or only 'processing-time' in sources time-characteristic: event-time # interval in ms for emitting periodic watermarks periodic-watermarks-interval: 200 # 'changelog' or 'table' presentation of results result-mode: table # maximum number of maintained rows in 'table' presentation of results max-table-result-rows: 1000000 # parallelism of the program parallelism: 3 # maximum parallelism max-parallelism: 128 # minimum idle state retention in ms min-idle-state-retention: 0 # maximum idle state retention in ms max-idle-state-retention: 0 # current catalog ('default_catalog' by default) # current-catalog: default_catalog # current database of the current catalog (default database of the catalog by default) #current-database: default_database # controls how table programs are restarted in case of a failures restart-strategy: # strategy type # possible values are "fixed-delay", "failure-rate", "none", or "fallback" (default) type: fallback #attempts: 3 #delay: 5000 #============================================================================== # Configuration options #============================================================================== # A full list of options and their default values can be found # on the dedicated "Configuration" page. configuration: table.optimizer.join-reorder-enabled: true table.exec.spill-compression.enabled: true table.exec.spill-compression.block-size: 128kb #============================================================================== # Deployment properties #============================================================================== # Properties that describe the cluster to which table programs are submitted to. deployment: # general cluster communication timeout in ms response-timeout: 5000 |
Hi,
The Job1 is a simple ETL job and doesn’t consume much state size (only Kafka offset), so it should work well. The Job2 is an unbounded join which will put the two input stream data into state in Join operator. As the input stream is unlimited and 100GB per day as you described, if you are using Memory statebackend (which is the default one). Then the job will OOM at the end. Here are my answers: > 1. How long does the data reside in my table once I read it? I consume 100GB per day, should have been a retention policy right? If so, where do I configure and how? The data is stored in state. You can specify the retention policy by setting “execution: min-idle-state-retention” and execution: max-idle-retention: “ keys[1] in environment file if you are using SQL CLI. > 2. Are retention policies specific to tables? No. It affects to all the stateble non-window operations (e.g. GroupBy, Join) > 3. I have a data set updates once a day. How about using UPSERT mode? If so, how could I delete the existing data set to load the new? Flink SQL doesn’t support to load periodic-changed data set yet. Maybe you can achieve this by implementing custom source and operators in DataStream API. Best, Jark [1]: https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sqlClient.html#environment-files > 在 2019年9月13日,15:43,srikanth flink <[hidden email]> 写道: > > Hi there, > > I came across Flink and FlinkSQL and using FlinkSQL for stream processing. > Flink runs as 3 node cluster with embedded Zookeeper, given heap 80GB on > each. I came across few issues and would like to get some clarification. > > - Job1: Using Flink(java) to read and flatten my JSON and write to Kafka > topic. > > > - Job2: Environment file configured to read from 2 different Kafka > topics. I get to join both the tables and are working. The query runs for a > while (say an hour) and then fails with *error*. > > Questions: > > 1. How long does the data reside in my table once I read it? I consume > 100GB per day, should have been a retention policy right? If so, where do I > configure and how? > 2. Are retention policies specific to tables? > 3. I have a data set updates once a day. How about using UPSERT mode? > If so, how could I delete the existing data set to load the new? > > > *Query*: SELECT s.* from sourceKafka AS s INNER JOIN badIp AS b ON > s.`source.ip`=b.ip; > *Error*: org.apache.flink.util.FlinkException: The assigned slot > e57d1c0556b4a197eb44d7d9e83e1a47_6 was removed. at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlot(SlotManagerImpl.java:958) > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.removeSlots(SlotManagerImpl.java:928) > at > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerImpl.internalUnregisterTaskManager(SlotManagerImpl.java:1149) > > *Environment File*: > #============================================================================== > # Tables > #============================================================================== > > # Define tables here such as sources, sinks, views, or temporal tables. > tables: # empty list > # A typical table source definition looks like: > - name: sourceKafka > type: source-table > update-mode: append > connector: > type: kafka > version: "universal" # required: valid connector versions are > # "0.8", "0.9", "0.10", "0.11", and "universal" > topic: recon-data-flatten # required: topic name from which > the table is read > > properties: # optional: connector specific properties > - key: zookeeper.connect > value: 1.2.4.1:2181 > - key: bootstrap.servers > value: 1.2.4.1:9092 > - key: group.id > value: reconDataGroup > format: > type: json > fail-on-missing-field: false > json-schema: > > { > type: 'object', > properties: { > 'source.ip': { > type: 'string' > }, > 'source.port': { > type: 'string' > }, > 'destination.ip': { > type: 'string' > }, > 'destination.port': { > type: 'string' > } > } > } > derive-schema: false > > schema: > - name: 'source.ip' > type: VARCHAR > - name: 'source.port' > type: VARCHAR > - name: 'destination.ip' > type: VARCHAR > - name: 'destination.port' > type: VARCHAR > > - name: badips > type: source-table > #update-mode: append > connector: > type: filesystem > path: "/home/ipsum/levels/badips.csv" > format: > type: csv > fields: > - name: ip > type: VARCHAR > comment-prefix: "#" > schema: > - name: ip > type: VARCHAR > > #============================================================================== > # Execution properties > #============================================================================== > > # Properties that change the fundamental execution behavior of a table > program. > > execution: > # select the implementation responsible for planning table programs > # possible values are 'old' (used by default) or 'blink' > planner: blink > # 'batch' or 'streaming' execution > type: streaming > # allow 'event-time' or only 'processing-time' in sources > time-characteristic: event-time > # interval in ms for emitting periodic watermarks > periodic-watermarks-interval: 200 > # 'changelog' or 'table' presentation of results > result-mode: table > # maximum number of maintained rows in 'table' presentation of results > max-table-result-rows: 1000000 > # parallelism of the program > parallelism: 3 > # maximum parallelism > max-parallelism: 128 > # minimum idle state retention in ms > min-idle-state-retention: 0 > # maximum idle state retention in ms > max-idle-state-retention: 0 > # current catalog ('default_catalog' by default) > # current-catalog: default_catalog > # current database of the current catalog (default database of the > catalog by default) > #current-database: default_database > # controls how table programs are restarted in case of a failures > restart-strategy: > # strategy type > # possible values are "fixed-delay", "failure-rate", "none", or > "fallback" (default) > type: fallback > #attempts: 3 > #delay: 5000 > > #============================================================================== > # Configuration options > #============================================================================== > > # A full list of options and their default values can be found > # on the dedicated "Configuration" page. > configuration: > table.optimizer.join-reorder-enabled: true > table.exec.spill-compression.enabled: true > table.exec.spill-compression.block-size: 128kb > > #============================================================================== > # Deployment properties > #============================================================================== > > # Properties that describe the cluster to which table programs are > submitted to. > > deployment: > # general cluster communication timeout in ms > response-timeout: 5000 |
Free forum by Nabble | Edit this page |