Dian Fu created FLINK-17946:
------------------------------- Summary: The config option "pipeline.jars" doesn't work if the job was executed via TableEnvironment.execute_sql and StatementSet.execute Key: FLINK-17946 URL: https://issues.apache.org/jira/browse/FLINK-17946 Project: Flink Issue Type: Bug Components: API / Python Reporter: Dian Fu Fix For: 1.11.0 For the following job: {code} import logging import sys import tempfile from pyflink.table import BatchTableEnvironment, EnvironmentSettings def word_count(): content = "line Licensed to the Apache Software Foundation ASF under one " \ "line or more contributor license agreements See the NOTICE file " \ "line distributed with this work for additional information " \ "line regarding copyright ownership The ASF licenses this file " \ "to you under the Apache License Version the " \ "License you may not use this file except in compliance " \ "with the License" environment_settings = EnvironmentSettings.new_instance().in_batch_mode().\ use_blink_planner().build() t_env = BatchTableEnvironment.create(environment_settings=environment_settings) t_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///Users/dianfu/workspace/wordcount_python/flink-csv-1.11.0-sql-jar.jar") # register Results table in table environment tmp_dir = tempfile.gettempdir() result_path = tmp_dir + '/result' logging.info("Results directory: %s", result_path) sink_ddl = """ create table Results( word VARCHAR, `count` BIGINT ) with ( 'connector' = 'filesystem', 'format' = 'csv', 'path' = '{}' ) """.format(result_path) t_env.execute_sql(sink_ddl) elements = [(word, 1) for word in content.split(" ")] table = t_env.from_elements(elements, ["word", "count"]) \ .group_by("word") \ .select("word, count(1) as count") statement_set = t_env.create_statement_set() statement_set.add_insert("Results", table, overwrite=True) statement_set.execute() if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") word_count() {code} It will throw exceptions as following: {code} Caused by: java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.flink.table.filesystem.FileSystemOutputFormat.formatFactory of type org.apache.flink.table.filesystem.OutputFormatFactory in instance of org.apache.flink.table.filesystem.FileSystemOutputFormat at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133) at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at java.util.HashMap.readObject(HashMap.java:1404) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2122) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511) at org.apache.flink.runtime.operators.util.TaskConfig.getStubWrapper(TaskConfig.java:288) at org.apache.flink.runtime.jobgraph.InputOutputFormatContainer.<init>(InputOutputFormatContainer.java:66) ... 23 more {code} The reason is that the "pipeline.jars" option is not handled properly in StatementSet.execute and this issue also exists in TableEnvironment.execute_sql -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |