[jira] [Created] (FLINK-18770) Emitting element fails in KryoSerializer

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-18770) Emitting element fails in KryoSerializer

Shang Yuanchun (Jira)
Leonid Ilyevsky created FLINK-18770:
---------------------------------------

             Summary: Emitting element fails in KryoSerializer
                 Key: FLINK-18770
                 URL: https://issues.apache.org/jira/browse/FLINK-18770
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.11.1
         Environment: Flink 1.11.1, Linux
            Reporter: Leonid Ilyevsky
         Attachments: KryoException.txt, SolaceSource.java

I wrote a simple Flink connector for Solace, see attached java file. It works fine under local execution environment. However, when I deployed it in the real Flink cluster, it failed with the Kryo exception, see attached.

After a few hours of search and debugging, I can see now what is going on.

The data I want to emit from this source is a simple byte array. In the exception stack you can see that when I call 'collect' on the context, it goes into OperatorChain.java:715, and then to KryoSerializer, where it ultimately fails. I didn't have a chance to learn what KryoSerializer is and why it would not know what to do with byte[], but that is not the point now.

Then I used debugger in my local test, in order to figure out how it manages to work. I saw that after OperatorChain.java:715 it goes into BytePrimitiveArraySerializer, and then everything is working as expected. Obviously BytePrimitiveArraySerializer makes sense for byte[] data.

The question is, how can I configure the execution environment under cluster so that it does serialization the same way as the local one? I looked at [https://ci.apache.org/projects/flink/flink-docs-stable/dev/execution_configuration.html] , and I was thinking of setting disableForceKryo, but it says it is disabled by default anyway.

 

Another question is, why cluster execution environment has different default settings compare to local? This makes it difficult to rely on local tests.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)