ElasticsearchSink Exception

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

ElasticsearchSink Exception

Govindarajan Srinivasaraghavan
Hi All,

I'm getting the below exception when I start my flink job. I have verified
the elastic search host and it seems to be working well. I have also tried
including the below dependecies to my project but nothing works. Need some
help. Thanks.

compile group: 'org.apache.lucene', name: 'lucene-core', version: '5.5.0'
compile group: 'org.elasticsearch', name: 'elasticsearch', version: '2.3.5'


*Sink Code:*

List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new
InetSocketAddress(InetAddress.getByName(*hostName*), 9300));

output.addSink(new ElasticsearchSink<>(config, transportAddresses, new
ElasticsearchSinkFunction<Object>() {

}


*Exception:*

java.lang.RuntimeException: Client is not connected to any Elasticsearch nodes!
        at org.apache.flink.streaming.connectors.elasticsearch2.ElasticsearchSink.open(ElasticsearchSink.java:172)
        at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:386)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:262)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
        at java.lang.Thread.run(Thread.java:745)
Reply | Threaded
Open this post in threaded view
|

Re: ElasticsearchSink Exception

Flavio Pompermaier
Are you sure that in elasticsearch.yml you've enabled ES to listen to the
http port 9300?

On 25 Feb 2017 08:58, "Govindarajan Srinivasaraghavan" <
[hidden email]> wrote:

Hi All,

I'm getting the below exception when I start my flink job. I have verified
the elastic search host and it seems to be working well. I have also tried
including the below dependecies to my project but nothing works. Need some
help. Thanks.

compile group: 'org.apache.lucene', name: 'lucene-core', version: '5.5.0'
compile group: 'org.elasticsearch', name: 'elasticsearch', version: '2.3.5'


*Sink Code:*

List<InetSocketAddress> transportAddresses = new ArrayList<>();
transportAddresses.add(new
InetSocketAddress(InetAddress.getByName(*hostName*), 9300));

output.addSink(new ElasticsearchSink<>(config, transportAddresses, new
ElasticsearchSinkFunction<Object>() {

}


*Exception:*

java.lang.RuntimeException: Client is not connected to any Elasticsearch
nodes!
        at org.apache.flink.streaming.connectors.elasticsearch2.
ElasticsearchSink.open(ElasticsearchSink.java:172)
        at org.apache.flink.api.common.functions.util.FunctionUtils.
openFunction(FunctionUtils.java:36)
        at org.apache.flink.streaming.api.operators.
AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.
openAllOperators(StreamTask.java:386)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.
invoke(StreamTask.java:262)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
        at java.lang.Thread.run(Thread.java:745)
Reply | Threaded
Open this post in threaded view
|

Re: ElasticsearchSink Exception

Govindarajan Srinivasaraghavan
Hi Flavio,

I tried with both http port 9200 and tcp port 9300 and I see incoming
connections in the elasticserach node. Also I see the below errors in
taskmanager out logs. Below are the dependencies I have on my gradle
project. Am I missing something?

Exception in thread "elasticsearch[Madame Menace][generic][T#2]"
java.lang.NoClassDefFoundError:
org/elasticsearch/index/mapper/MapperParsingException
        at
org.elasticsearch.ElasticsearchException.<clinit>(ElasticsearchException.java:597)
        at
org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.ClassNotFoundException:
org.elasticsearch.index.mapper.MapperParsingException
        at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        ... 5 more


Exception in thread "elasticsearch[Saint Elmo][generic][T#2]"
java.lang.NoClassDefFoundError: Could not initialize class
org.elasticsearch.transport.NodeDisconnectedException
        at
org.elasticsearch.transport.TransportService$Adapter$3.run(TransportService.java:622)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


compile group: 'org.apache.flink', name: 'flink-core', version: '1.2.0'
compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10',
version: '1.2.0'
compile group: 'org.apache.flink', name: 'flink-java', version: '1.2.0'
compile group: 'org.apache.flink', name:
'flink-connector-kafka-0.10_2.10', version: '1.2.0'
compile group: 'org.apache.flink', name: 'flink-clients_2.10', version: '1.2.0'

compile group: 'org.apache.flink', name:
'flink-connector-elasticsearch2_2.10', version: '1.2.0'


On Sat, Feb 25, 2017 at 1:26 AM, Flavio Pompermaier <[hidden email]>
wrote:

> Are you sure that in elasticsearch.yml you've enabled ES to listen to the
> http port 9300?
>
> On 25 Feb 2017 08:58, "Govindarajan Srinivasaraghavan" <
> [hidden email]> wrote:
>
> Hi All,
>
> I'm getting the below exception when I start my flink job. I have verified
> the elastic search host and it seems to be working well. I have also tried
> including the below dependecies to my project but nothing works. Need some
> help. Thanks.
>
> compile group: 'org.apache.lucene', name: 'lucene-core', version: '5.5.0'
> compile group: 'org.elasticsearch', name: 'elasticsearch', version: '2.3.5'
>
>
> *Sink Code:*
>
> List<InetSocketAddress> transportAddresses = new ArrayList<>();
> transportAddresses.add(new
> InetSocketAddress(InetAddress.getByName(*hostName*), 9300));
>
> output.addSink(new ElasticsearchSink<>(config, transportAddresses, new
> ElasticsearchSinkFunction<Object>() {
>
> }
>
>
> *Exception:*
>
> java.lang.RuntimeException: Client is not connected to any Elasticsearch
> nodes!
>         at org.apache.flink.streaming.connectors.elasticsearch2.
> ElasticsearchSink.open(ElasticsearchSink.java:172)
>         at org.apache.flink.api.common.functions.util.FunctionUtils.
> openFunction(FunctionUtils.java:36)
>         at org.apache.flink.streaming.api.operators.
> AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> openAllOperators(StreamTask.java:386)
>         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> invoke(StreamTask.java:262)
>         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
>         at java.lang.Thread.run(Thread.java:745)
>
Reply | Threaded
Open this post in threaded view
|

Re: ElasticsearchSink Exception

Flavio Pompermaier
The exception you have (NoClassDefFoundError:
org/elasticsearch/index/mapper/MapperParsingException) is usually caused by
elasticsearch version conflict or a bad shading when creating the uber jar.
Can you check if one of the 2 is causing the problem?

On 25 Feb 2017 23:13, "Govindarajan Srinivasaraghavan" <
[hidden email]> wrote:

> Hi Flavio,
>
> I tried with both http port 9200 and tcp port 9300 and I see incoming
> connections in the elasticserach node. Also I see the below errors in
> taskmanager out logs. Below are the dependencies I have on my gradle
> project. Am I missing something?
>
> Exception in thread "elasticsearch[Madame Menace][generic][T#2]"
> java.lang.NoClassDefFoundError:
> org/elasticsearch/index/mapper/MapperParsingException
>         at
> org.elasticsearch.ElasticsearchException.<clinit>(
> ElasticsearchException.java:597)
>         at
> org.elasticsearch.transport.TransportService$Adapter$3.
> run(TransportService.java:622)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassNotFoundException:
> org.elasticsearch.index.mapper.MapperParsingException
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>         ... 5 more
>
>
> Exception in thread "elasticsearch[Saint Elmo][generic][T#2]"
> java.lang.NoClassDefFoundError: Could not initialize class
> org.elasticsearch.transport.NodeDisconnectedException
>         at
> org.elasticsearch.transport.TransportService$Adapter$3.
> run(TransportService.java:622)
>         at
> java.util.concurrent.ThreadPoolExecutor.runWorker(
> ThreadPoolExecutor.java:1142)
>         at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(
> ThreadPoolExecutor.java:617)
>         at java.lang.Thread.run(Thread.java:745)
>
>
> compile group: 'org.apache.flink', name: 'flink-core', version: '1.2.0'
> compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10',
> version: '1.2.0'
> compile group: 'org.apache.flink', name: 'flink-java', version: '1.2.0'
> compile group: 'org.apache.flink', name:
> 'flink-connector-kafka-0.10_2.10', version: '1.2.0'
> compile group: 'org.apache.flink', name: 'flink-clients_2.10', version:
> '1.2.0'
>
> compile group: 'org.apache.flink', name:
> 'flink-connector-elasticsearch2_2.10', version: '1.2.0'
>
>
> On Sat, Feb 25, 2017 at 1:26 AM, Flavio Pompermaier <[hidden email]>
> wrote:
>
> > Are you sure that in elasticsearch.yml you've enabled ES to listen to the
> > http port 9300?
> >
> > On 25 Feb 2017 08:58, "Govindarajan Srinivasaraghavan" <
> > [hidden email]> wrote:
> >
> > Hi All,
> >
> > I'm getting the below exception when I start my flink job. I have
> verified
> > the elastic search host and it seems to be working well. I have also
> tried
> > including the below dependecies to my project but nothing works. Need
> some
> > help. Thanks.
> >
> > compile group: 'org.apache.lucene', name: 'lucene-core', version: '5.5.0'
> > compile group: 'org.elasticsearch', name: 'elasticsearch', version:
> '2.3.5'
> >
> >
> > *Sink Code:*
> >
> > List<InetSocketAddress> transportAddresses = new ArrayList<>();
> > transportAddresses.add(new
> > InetSocketAddress(InetAddress.getByName(*hostName*), 9300));
> >
> > output.addSink(new ElasticsearchSink<>(config, transportAddresses, new
> > ElasticsearchSinkFunction<Object>() {
> >
> > }
> >
> >
> > *Exception:*
> >
> > java.lang.RuntimeException: Client is not connected to any Elasticsearch
> > nodes!
> >         at org.apache.flink.streaming.connectors.elasticsearch2.
> > ElasticsearchSink.open(ElasticsearchSink.java:172)
> >         at org.apache.flink.api.common.functions.util.FunctionUtils.
> > openFunction(FunctionUtils.java:36)
> >         at org.apache.flink.streaming.api.operators.
> > AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > openAllOperators(StreamTask.java:386)
> >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > invoke(StreamTask.java:262)
> >         at org.apache.flink.runtime.taskmanager.Task.run(Task.java:655)
> >         at java.lang.Thread.run(Thread.java:745)
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: ElasticsearchSink Exception

Govindarajan Srinivasaraghavan
Thanks Flavio. I tried with multiple versions but still the same exception
and I was able to locate the class file inside my jar. Am I missing
something? Thanks for all the help.

On Sat, Feb 25, 2017 at 3:09 PM, Flavio Pompermaier <[hidden email]>
wrote:

> The exception you have (NoClassDefFoundError:
> org/elasticsearch/index/mapper/MapperParsingException) is usually caused
> by
> elasticsearch version conflict or a bad shading when creating the uber jar.
> Can you check if one of the 2 is causing the problem?
>
> On 25 Feb 2017 23:13, "Govindarajan Srinivasaraghavan" <
> [hidden email]> wrote:
>
> > Hi Flavio,
> >
> > I tried with both http port 9200 and tcp port 9300 and I see incoming
> > connections in the elasticserach node. Also I see the below errors in
> > taskmanager out logs. Below are the dependencies I have on my gradle
> > project. Am I missing something?
> >
> > Exception in thread "elasticsearch[Madame Menace][generic][T#2]"
> > java.lang.NoClassDefFoundError:
> > org/elasticsearch/index/mapper/MapperParsingException
> >         at
> > org.elasticsearch.ElasticsearchException.<clinit>(
> > ElasticsearchException.java:597)
> >         at
> > org.elasticsearch.transport.TransportService$Adapter$3.
> > run(TransportService.java:622)
> >         at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1142)
> >         at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:617)
> >         at java.lang.Thread.run(Thread.java:745)
> > Caused by: java.lang.ClassNotFoundException:
> > org.elasticsearch.index.mapper.MapperParsingException
> >         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> >         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> >         ... 5 more
> >
> >
> > Exception in thread "elasticsearch[Saint Elmo][generic][T#2]"
> > java.lang.NoClassDefFoundError: Could not initialize class
> > org.elasticsearch.transport.NodeDisconnectedException
> >         at
> > org.elasticsearch.transport.TransportService$Adapter$3.
> > run(TransportService.java:622)
> >         at
> > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > ThreadPoolExecutor.java:1142)
> >         at
> > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > ThreadPoolExecutor.java:617)
> >         at java.lang.Thread.run(Thread.java:745)
> >
> >
> > compile group: 'org.apache.flink', name: 'flink-core', version: '1.2.0'
> > compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10',
> > version: '1.2.0'
> > compile group: 'org.apache.flink', name: 'flink-java', version: '1.2.0'
> > compile group: 'org.apache.flink', name:
> > 'flink-connector-kafka-0.10_2.10', version: '1.2.0'
> > compile group: 'org.apache.flink', name: 'flink-clients_2.10', version:
> > '1.2.0'
> >
> > compile group: 'org.apache.flink', name:
> > 'flink-connector-elasticsearch2_2.10', version: '1.2.0'
> >
> >
> > On Sat, Feb 25, 2017 at 1:26 AM, Flavio Pompermaier <
> [hidden email]>
> > wrote:
> >
> > > Are you sure that in elasticsearch.yml you've enabled ES to listen to
> the
> > > http port 9300?
> > >
> > > On 25 Feb 2017 08:58, "Govindarajan Srinivasaraghavan" <
> > > [hidden email]> wrote:
> > >
> > > Hi All,
> > >
> > > I'm getting the below exception when I start my flink job. I have
> > verified
> > > the elastic search host and it seems to be working well. I have also
> > tried
> > > including the below dependecies to my project but nothing works. Need
> > some
> > > help. Thanks.
> > >
> > > compile group: 'org.apache.lucene', name: 'lucene-core', version:
> '5.5.0'
> > > compile group: 'org.elasticsearch', name: 'elasticsearch', version:
> > '2.3.5'
> > >
> > >
> > > *Sink Code:*
> > >
> > > List<InetSocketAddress> transportAddresses = new ArrayList<>();
> > > transportAddresses.add(new
> > > InetSocketAddress(InetAddress.getByName(*hostName*), 9300));
> > >
> > > output.addSink(new ElasticsearchSink<>(config, transportAddresses, new
> > > ElasticsearchSinkFunction<Object>() {
> > >
> > > }
> > >
> > >
> > > *Exception:*
> > >
> > > java.lang.RuntimeException: Client is not connected to any
> Elasticsearch
> > > nodes!
> > >         at org.apache.flink.streaming.connectors.elasticsearch2.
> > > ElasticsearchSink.open(ElasticsearchSink.java:172)
> > >         at org.apache.flink.api.common.functions.util.FunctionUtils.
> > > openFunction(FunctionUtils.java:36)
> > >         at org.apache.flink.streaming.api.operators.
> > > AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
> > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > openAllOperators(StreamTask.java:386)
> > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > invoke(StreamTask.java:262)
> > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> java:655)
> > >         at java.lang.Thread.run(Thread.java:745)
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: ElasticsearchSink Exception

Aljoscha Krettek-2
+Tzu-Li (Gordon) Tai <[hidden email]> Do you have any idea what could
be causing this? I'm asking because you recently worked on the
Elasticsearch connectors, right?

On Sun, 26 Feb 2017 at 04:26 Govindarajan Srinivasaraghavan <
[hidden email]> wrote:

> Thanks Flavio. I tried with multiple versions but still the same exception
> and I was able to locate the class file inside my jar. Am I missing
> something? Thanks for all the help.
>
> On Sat, Feb 25, 2017 at 3:09 PM, Flavio Pompermaier <[hidden email]>
> wrote:
>
> > The exception you have (NoClassDefFoundError:
> > org/elasticsearch/index/mapper/MapperParsingException) is usually caused
> > by
> > elasticsearch version conflict or a bad shading when creating the uber
> jar.
> > Can you check if one of the 2 is causing the problem?
> >
> > On 25 Feb 2017 23:13, "Govindarajan Srinivasaraghavan" <
> > [hidden email]> wrote:
> >
> > > Hi Flavio,
> > >
> > > I tried with both http port 9200 and tcp port 9300 and I see incoming
> > > connections in the elasticserach node. Also I see the below errors in
> > > taskmanager out logs. Below are the dependencies I have on my gradle
> > > project. Am I missing something?
> > >
> > > Exception in thread "elasticsearch[Madame Menace][generic][T#2]"
> > > java.lang.NoClassDefFoundError:
> > > org/elasticsearch/index/mapper/MapperParsingException
> > >         at
> > > org.elasticsearch.ElasticsearchException.<clinit>(
> > > ElasticsearchException.java:597)
> > >         at
> > > org.elasticsearch.transport.TransportService$Adapter$3.
> > > run(TransportService.java:622)
> > >         at
> > > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > ThreadPoolExecutor.java:1142)
> > >         at
> > > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > ThreadPoolExecutor.java:617)
> > >         at java.lang.Thread.run(Thread.java:745)
> > > Caused by: java.lang.ClassNotFoundException:
> > > org.elasticsearch.index.mapper.MapperParsingException
> > >         at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> > >         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> > >         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> > >         ... 5 more
> > >
> > >
> > > Exception in thread "elasticsearch[Saint Elmo][generic][T#2]"
> > > java.lang.NoClassDefFoundError: Could not initialize class
> > > org.elasticsearch.transport.NodeDisconnectedException
> > >         at
> > > org.elasticsearch.transport.TransportService$Adapter$3.
> > > run(TransportService.java:622)
> > >         at
> > > java.util.concurrent.ThreadPoolExecutor.runWorker(
> > > ThreadPoolExecutor.java:1142)
> > >         at
> > > java.util.concurrent.ThreadPoolExecutor$Worker.run(
> > > ThreadPoolExecutor.java:617)
> > >         at java.lang.Thread.run(Thread.java:745)
> > >
> > >
> > > compile group: 'org.apache.flink', name: 'flink-core', version: '1.2.0'
> > > compile group: 'org.apache.flink', name: 'flink-streaming-java_2.10',
> > > version: '1.2.0'
> > > compile group: 'org.apache.flink', name: 'flink-java', version: '1.2.0'
> > > compile group: 'org.apache.flink', name:
> > > 'flink-connector-kafka-0.10_2.10', version: '1.2.0'
> > > compile group: 'org.apache.flink', name: 'flink-clients_2.10', version:
> > > '1.2.0'
> > >
> > > compile group: 'org.apache.flink', name:
> > > 'flink-connector-elasticsearch2_2.10', version: '1.2.0'
> > >
> > >
> > > On Sat, Feb 25, 2017 at 1:26 AM, Flavio Pompermaier <
> > [hidden email]>
> > > wrote:
> > >
> > > > Are you sure that in elasticsearch.yml you've enabled ES to listen to
> > the
> > > > http port 9300?
> > > >
> > > > On 25 Feb 2017 08:58, "Govindarajan Srinivasaraghavan" <
> > > > [hidden email]> wrote:
> > > >
> > > > Hi All,
> > > >
> > > > I'm getting the below exception when I start my flink job. I have
> > > verified
> > > > the elastic search host and it seems to be working well. I have also
> > > tried
> > > > including the below dependecies to my project but nothing works. Need
> > > some
> > > > help. Thanks.
> > > >
> > > > compile group: 'org.apache.lucene', name: 'lucene-core', version:
> > '5.5.0'
> > > > compile group: 'org.elasticsearch', name: 'elasticsearch', version:
> > > '2.3.5'
> > > >
> > > >
> > > > *Sink Code:*
> > > >
> > > > List<InetSocketAddress> transportAddresses = new ArrayList<>();
> > > > transportAddresses.add(new
> > > > InetSocketAddress(InetAddress.getByName(*hostName*), 9300));
> > > >
> > > > output.addSink(new ElasticsearchSink<>(config, transportAddresses,
> new
> > > > ElasticsearchSinkFunction<Object>() {
> > > >
> > > > }
> > > >
> > > >
> > > > *Exception:*
> > > >
> > > > java.lang.RuntimeException: Client is not connected to any
> > Elasticsearch
> > > > nodes!
> > > >         at org.apache.flink.streaming.connectors.elasticsearch2.
> > > > ElasticsearchSink.open(ElasticsearchSink.java:172)
> > > >         at org.apache.flink.api.common.functions.util.FunctionUtils.
> > > > openFunction(FunctionUtils.java:36)
> > > >         at org.apache.flink.streaming.api.operators.
> > > > AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:112)
> > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > openAllOperators(StreamTask.java:386)
> > > >         at org.apache.flink.streaming.runtime.tasks.StreamTask.
> > > > invoke(StreamTask.java:262)
> > > >         at org.apache.flink.runtime.taskmanager.Task.run(Task.
> > java:655)
> > > >         at java.lang.Thread.run(Thread.java:745)
> > > >
> > >
> >
>