Dian Fu created FLINK-17923:
------------------------------- Summary: It will throw MemoryAllocationException if rocksdb statebackend and Python UDF are used in the same slot Key: FLINK-17923 URL: https://issues.apache.org/jira/browse/FLINK-17923 Project: Flink Issue Type: Bug Components: API / Python, Runtime / State Backends Affects Versions: 1.10.0, 1.11.0 Reporter: Dian Fu Fix For: 1.11.0 For the following job: {code} import logging import os import shutil import sys import tempfile from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import TableConfig, StreamTableEnvironment, DataTypes from pyflink.table.udf import udf def word_count(): content = "line Licensed to the Apache Software Foundation ASF under one " \ "line or more contributor license agreements See the NOTICE file " \ "line distributed with this work for additional information " \ "line regarding copyright ownership The ASF licenses this file " \ "to you under the Apache License Version the " \ "License you may not use this file except in compliance " \ "with the License" t_config = TableConfig() env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env, t_config) # register Results table in table environment tmp_dir = tempfile.gettempdir() result_path = tmp_dir + '/result' if os.path.exists(result_path): try: if os.path.isfile(result_path): os.remove(result_path) else: shutil.rmtree(result_path) except OSError as e: logging.error("Error removing directory: %s - %s.", e.filename, e.strerror) logging.info("Results directory: %s", result_path) sink_ddl = """ create table Results( word VARCHAR, `count` BIGINT ) with ( 'connector' = 'blackhole' ) """ t_env.sql_update(sink_ddl) @udf(input_types=[DataTypes.BIGINT()], result_type=DataTypes.BIGINT()) def inc(count): return count + 1 t_env.register_function("inc", inc) elements = [(word, 1) for word in content.split(" ")] t_env.from_elements(elements, ["word", "count"]) \ .group_by("word") \ .select("word, count(1) as count") \ .select("word, inc(count) as count") \ .insert_into("Results") t_env.execute("word_count") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") word_count() {code} It will throw the following exception: {code} Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for KeyedProcessOperator_c27dcf7b54ef6bfd6cff02ca8870b681_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:317) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:144) ... 9 more Caused by: java.io.IOException: Failed to acquire shared cache resource for RocksDB at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:212) at org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:516) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:301) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:142) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:121) ... 11 more Caused by: org.apache.flink.runtime.memory.MemoryAllocationException: Could not created the shared memory resource of size 536870920. Not enough memory left to reserve from the slot's managed memory. at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:603) at org.apache.flink.runtime.memory.SharedResources.createResource(SharedResources.java:130) at org.apache.flink.runtime.memory.SharedResources.getOrAllocateSharedResource(SharedResources.java:72) at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:617) at org.apache.flink.runtime.memory.MemoryManager.getSharedMemoryResourceForManagedMemory(MemoryManager.java:566) at org.apache.flink.contrib.streaming.state.RocksDBOperationUtils.allocateSharedCachesIfConfigured(RocksDBOperationUtils.java:208) ... 15 more Caused by: org.apache.flink.runtime.memory.MemoryReservationException: Could not allocate 536870920 bytes. Only 454033416 bytes are remaining. at org.apache.flink.runtime.memory.MemoryManager.reserveMemory(MemoryManager.java:461) at org.apache.flink.runtime.memory.MemoryManager.lambda$getSharedMemoryResourceForManagedMemory$8(MemoryManager.java:601) ... 20 more {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |