Dian Fu created FLINK-22124:
------------------------------- Summary: The job finished without any exception if error was thrown during state access Key: FLINK-22124 URL: https://issues.apache.org/jira/browse/FLINK-22124 Project: Flink Issue Type: Sub-task Components: API / Python Affects Versions: 1.13.0 Reporter: Dian Fu Fix For: 1.13.0 For the following job: {code} import logging from pyflink.common import WatermarkStrategy, Row from pyflink.common.serialization import Encoder from pyflink.common.typeinfo import Types from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import FileSink, OutputFileConfig, NumberSequenceSource from pyflink.datastream.execution_mode import RuntimeExecutionMode from pyflink.datastream.functions import KeyedProcessFunction, RuntimeContext from pyflink.datastream.state import MapStateDescriptor env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(2) env.set_runtime_mode(RuntimeExecutionMode.BATCH) seq_num_source = NumberSequenceSource(1, 1000) file_sink = FileSink \ .for_row_format('/Users/dianfu/code/src/apache/playgrounds/examples/output/data_stream_batch_state', Encoder.simple_string_encoder()) \ .with_output_file_config(OutputFileConfig.builder().with_part_prefix('pre').with_part_suffix('suf').build()) \ .build() ds = env.from_source( source=seq_num_source, watermark_strategy=WatermarkStrategy.for_monotonous_timestamps(), source_name='file_source', type_info=Types.LONG()) class MyKeyedProcessFunction(KeyedProcessFunction): def __init__(self): self.state = None def open(self, runtime_context: RuntimeContext): logging.info("open") state_desc = MapStateDescriptor('map', Types.LONG(), Types.LONG()) self.state = runtime_context.get_map_state(state_desc) def process_element(self, value, ctx: 'KeyedProcessFunction.Context'): existing = self.state.get(value[0]) if existing is None: result = value[1] self.state.put(value[0], result) elif existing <= 10: result = value[1] + existing self.state.put(value[0], result) else: result = existing yield result ds.map(lambda a: Row(a % 4, 1), output_type=Types.ROW([Types.LONG(), Types.LONG()])) \ .key_by(lambda a: a[0]) \ .process(MyKeyedProcessFunction(), Types.LONG()) \ .sink_to(file_sink) env.execute('data_stream_batch_state') {code} As it will encounter KeyError for `self.state.get(value[0])`, the job finished without any error message. This issue should be addressed. We should make sure the error message appears in the log file to help users to figure out what happens. -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |