kryoException : Buffer underflow

classic Classic list List threaded Threaded
11 messages Options
Reply | Threaded
Open this post in threaded view
|

kryoException : Buffer underflow

Nam-Luc Tran
Hello,

I came accross an error for which I am unable to retrace the exact cause. Starting from flink-java-examples module, I have extended the KMeans example to a case where points have 25 coordinates. It follows the exact same structure and transformations as the original example, only with points having 25 coordinates instead of 2.

When creating the centroids dataset within the code as follows the job iterates and executes well:

Centroid25 cent1 = new Centroid25(ThreadLocalRandom.current().nextInt(0, 1000), -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
Centroid25 cent2 = new Centroid25(ThreadLocalRandom.current().nextInt(0, 1000), -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
DataSet<Centroid25> centroids = env.fromCollection(Arrays.asList(cent1, cent2));

When reading from a csv file containing the following:
-10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
-1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0

with the following code:
DataSet<Centroid25>> centroids = env
                                .readCsvFile("file:///home/nltran/res3.csv")
                                .fieldDelimiter(",")
                                .includeFields("1111111111111111111111111")
                                .types(Double.class, Double.class, Double.class, Double.class, Double.class, Double.class,
                                                Double.class, Double.class, Double.class, Double.class, Double.class, Double.class,
                                                Double.class, Double.class, Double.class, Double.class, Double.class, Double.class,
                                                Double.class, Double.class, Double.class, Double.class, Double.class, Double.class,
                                                Double.class).map(p -> {
                                        return new Centroid25(ThreadLocalRandom.current().nextInt(0, 1000), p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
                                }).returns("eu.euranova.flink.Centroid25");


I hit the following exception:

02/11/2015 14:58:27 PartialSolution (BulkIteration (Bulk Iteration))(1/1) switched to FAILED
com.esotericsoftware.kryo.KryoException: Buffer underflow
        at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
        at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
        at org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
        at org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
        at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
        at org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
        at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
        at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
        at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
        at java.lang.Thread.run(Thread.java:745)

02/11/2015 14:58:27 Job execution switched to status FAILING.
02/11/2015 14:58:27 CHAIN Map (Map at main(DoTheKMeans.java:64)) -> Map (Map at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
02/11/2015 14:58:27 Combine (Reduce at main(DoTheKMeans.java:68))(1/1) switched to CANCELING
02/11/2015 14:58:27 CHAIN Reduce(Reduce at main(DoTheKMeans.java:68)) -> Map (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
02/11/2015 14:58:27 DataSink(Print to System.out)(1/1) switched to CANCELED
02/11/2015 14:58:27 Sync(BulkIteration (Bulk Iteration))(1/1) switched to CANCELING
02/11/2015 14:58:27 Sync(BulkIteration (Bulk Iteration))(1/1) switched to CANCELED
02/11/2015 14:58:27 CHAIN Map (Map at main(DoTheKMeans.java:64)) -> Map (Map at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
02/11/2015 14:58:27 Combine (Reduce at main(DoTheKMeans.java:68))(1/1) switched to CANCELED
02/11/2015 14:58:27 CHAIN Reduce(Reduce at main(DoTheKMeans.java:68)) -> Map (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
02/11/2015 14:58:27 Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: com.esotericsoftware.kryo.KryoException: Buffer underflow
        at org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
        at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
        at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
        at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
        at org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
        at org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
        at org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
        at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
        at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
        at org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
        at org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
        at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
        at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
        at java.lang.Thread.run(Thread.java:745)

        at org.apache.flink.runtime.client.JobClientListener$$anonfun$receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
        at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
        at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
        at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
        at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
        at org.apache.flink.runtime.client.JobClientListener.aroundReceive(JobClient.scala:74)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
        at akka.actor.ActorCell.invoke(ActorCell.scala:487)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
        at akka.dispatch.Mailbox.run(Mailbox.scala:221)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

The centroid25 data is exactly the same in both cases. Could you help me retrace what is wrong?

Thanks and best regards,

Tran Nam-Luc
Reply | Threaded
Open this post in threaded view
|

Re: kryoException : Buffer underflow

Stephan Ewen
Hi Tran Nam-Luc!

That is a problem we will look into.

In the meantime, can you try to modify your object such that it is a "Flink
POJO"? Then we will serialize it ourselves, without involving Kryo. To do
that, make sure that
 - The class is public
 - It has a public null-argument constructor
 - All fields are wither public, or have public getters and setters

Here are some minor pointers for the program:
 - If you include all CSV fields, you need not have the "
.includeFields("1111111111111111111111111")"
function call. The "includeFields" function is only necessary if you want
to skip over some fields.
 - If the lambda map function returns a simple class without generic
parameters, you do not need the 'returns("eu.euranova.flink.Centroid25")'
call. It should work even without.

Greetings,
Stephan




On Wed, Feb 11, 2015 at 3:02 PM, Nam-Luc Tran <[hidden email]>
wrote:

> Hello,
>
> I came accross an error for which I am unable to retrace the exact cause.
> Starting from flink-java-examples module, I have extended the KMeans
> example
> to a case where points have 25 coordinates. It follows the exact same
> structure and transformations as the original example, only with points
> having 25 coordinates instead of 2.
>
> When creating the centroids dataset within the code as follows the job
> iterates and executes well:
>
> Centroid25 cent1 = new Centroid25(ThreadLocalRandom.current().nextInt(0,
> 1000),
>
> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
> Centroid25 cent2 = new Centroid25(ThreadLocalRandom.current().nextInt(0,
> 1000),
>
> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
> DataSet<Centroid25> centroids = env.fromCollection(Arrays.asList(cent1,
> cent2));
>
> When reading from a csv file containing the following:
>
> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
>
> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
>
> with the following code:
> DataSet<Centroid25>> centroids = env
>
> .readCsvFile("file:///home/nltran/res3.csv")
>                                 .fieldDelimiter(",")
>                                 .includeFields("1111111111111111111111111")
>                                 .types(Double.class, Double.class,
> Double.class, Double.class,
> Double.class, Double.class,
>                                                 Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                 Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                 Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                 Double.class).map(p -> {
>                                         return new
> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
>
> p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
>                                 }).returns("eu.euranova.flink.Centroid25");
>
>
> I hit the following exception:
>
> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
> Iteration))(1/1)
> switched to FAILED
> com.esotericsoftware.kryo.KryoException: Buffer underflow
>         at
>
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
>         at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>         at
>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
>         at
>
> org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
>         at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>         at
>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
>         at
>
> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>         at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
>         at java.lang.Thread.run(Thread.java:745)
>
> 02/11/2015 14:58:27     Job execution switched to status FAILING.
> 02/11/2015 14:58:27     CHAIN Map (Map at main(DoTheKMeans.java:64)) ->
> Map (Map
> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
> 02/11/2015 14:58:27     Combine (Reduce at main(DoTheKMeans.java:68))(1/1)
> switched to CANCELING
> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at main(DoTheKMeans.java:68))
> -> Map
> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1) switched to
> CANCELED
> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk Iteration))(1/1) switched
> to
> CANCELING
> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk Iteration))(1/1) switched
> to
> CANCELED
> 02/11/2015 14:58:27     CHAIN Map (Map at main(DoTheKMeans.java:64)) ->
> Map (Map
> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
> 02/11/2015 14:58:27     Combine (Reduce at main(DoTheKMeans.java:68))(1/1)
> switched to CANCELED
> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at main(DoTheKMeans.java:68))
> -> Map
> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
> 02/11/2015 14:58:27     Job execution switched to status FAILED.
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException:
> com.esotericsoftware.kryo.KryoException: Buffer underflow
>         at
>
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
>         at com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>         at
>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
>         at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
>         at
>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
>         at
>
> org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
>         at
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>         at
>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
>         at
>
> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
>         at
>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>         at
>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
>         at java.lang.Thread.run(Thread.java:745)
>
>         at
>
> org.apache.flink.runtime.client.JobClientListener$$anonfun$receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at
>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at
>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
>         at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
>
> org.apache.flink.runtime.client.JobClientListener.aroundReceive(JobClient.scala:74)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> The centroid25 data is exactly the same in both cases. Could you help me
> retrace what is wrong?
>
> Thanks and best regards,
>
> Tran Nam-Luc
>
>
>
> --
> View this message in context:
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
> Sent from the Apache Flink (Incubator) Mailing List archive. mailing list
> archive at Nabble.com.
>
Reply | Threaded
Open this post in threaded view
|

Re: kryoException : Buffer underflow

Nam-Luc Tran
In reply to this post by Nam-Luc Tran
Hello Stephan, 

Thank you for your help.

I ensured all the POJO classes used comply to what you previously said
and the same exception occurs. Here is the listing of classes
Centroid25 and Point25:

public class Centroid25 extends Point25 {

public int id;

public Centroid25() {}

public Centroid25(int id, Double value0, Double value1, Double value2,
Double value3, Double value4, Double value5,
Double value6, Double value7, Double value8, Double value9, Double
value10, Double value11, Double value12,
Double value13, Double value14, Double value15, Double value16, Double
value17, Double value18,
Double value19, Double value20, Double value21, Double value22, Double
value23, Double value24) {
super(value0, value1, value2, value3, value4, value5, value6, value7,
value8, value9, value10, value11,
value12, value13, value14, value15, value16, value17, value18,
value19, value20, value21, value22,
value23, value24);
this.id = id;
}

public Centroid25(int id, Point25 p) {
super(p.f0,
p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
this.id = id;
}

public Centroid25(int id, Tuple25 p) {
super(p.f0,
p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
this.id = id;
}

@Override
public String toString() {
return id + " " + super.toString();
}
}

public class Point25{

public Double
f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,f17,f18,f19,f20,f21,f22,f23,f24
= 0.0;

public Point25() {
}

public Point25(Double value0, Double value1, Double value2, Double
value3, Double value4, Double value5,
Double value6, Double value7, Double value8, Double value9, Double
value10, Double value11, Double value12,
Double value13, Double value14, Double value15, Double value16, Double
value17, Double value18,
Double value19, Double value20, Double value21, Double value22, Double
value23, Double value24) {
f0=value0;
f1=value1;
f2=value2;
f3=value3;
f4=value4;
f5=value5;
f6=value6;
f7=value7;
f8=value8;
f9=value9;
f10=value10;
f11=value11;
f12=value12;
f13=value13;
f14=value14;
f15=value15;
f16=value16;
f17=value17;
f18=value18;
f19=value19;
f20=value20;
f21=value21;
f22=value22;
f23=value23;
f24=value24;

}

public List getFieldsAsList() {
return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11,
f12, f13, f14, f15, f16, f17, f18, f19,
f20, f21, f22, f23, f24);
}

public Point25 add(Point25 other) {
f0 += other.f0;
f1 += other.f1;
f2 += other.f2;
f3 += other.f3;
f4 += other.f4;
f5 += other.f5;
f6 += other.f6;
f7 += other.f7;
f8 += other.f8;
f9 += other.f9;
f10 += other.f10;
f11 += other.f11;
f12 += other.f12;
f13 += other.f13;
f14 += other.f14;
f15 += other.f15;
f16 += other.f16;
f17 += other.f17;
f18 += other.f18;
f19 += other.f19;
f20 += other.f20;
f21 += other.f21;
f22 += other.f22;
f23 += other.f23;
f24 += other.f24;
return this;
}

public Point25 div(long val) {
f0 /= val;
f1 /= val;
f2 /= val;
f3 /= val;
f4 /= val;
f5 += val;
f6 += val;
f7 += val;
f8 += val;
f9 += val;
f10 += val;
f11 += val;
f12 += val;
f13 += val;
f14 += val;
f15 += val;
f16 += val;
f17 += val;
f18 += val;
f19 += val;
f20 += val;
f21 += val;
f22 += val;
f23 += val;
f24 += val;
return this;
}

public double euclideanDistance(Point25 other) {
List l = this.getFieldsAsList();
List ol = other.getFieldsAsList();
double res = 0;
for(int i=0;i
> I came accross an error for which I am unable to retrace the exact
cause.
> Starting from flink-java-examples module, I have extended the KMeans
> example
> to a case where points have 25 coordinates. It follows the exact
same
> structure and transformations as the original example, only with
points
> having 25 coordinates instead of 2.
>
> When creating the centroids dataset within the code as follows the
job
> iterates and executes well:
>
> Centroid25 cent1 = new
Centroid25(ThreadLocalRandom.current().nextInt(0,
> 1000),
>
>
-10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
> Centroid25 cent2 = new
Centroid25(ThreadLocalRandom.current().nextInt(0,
> 1000),
>
>
-1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
> DataSet centroids = env.fromCollection(Arrays.asList(cent1,
> cent2));
>
> When reading from a csv file containing the following:
>
>
-10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
>
>
-1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
>
> with the following code:
> DataSet> centroids = env
>
> .readCsvFile("file:///home/nltran/res3.csv")
>                                
.fieldDelimiter(",")
>                                
.includeFields("1111111111111111111111111")
>                                
.types(Double.class, Double.class,
> Double.class, Double.class,
> Double.class, Double.class,
>                                                
Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                
Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                
Double.class,
> Double.class, Double.class, Double.class, Double.class,
> Double.class,
>                                                
Double.class).map(p -> {
>                                        
return new
> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
>
>
p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
>                                
}).returns("eu.euranova.flink.Centroid25");

>
>
> I hit the following exception:
>
> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
> Iteration))(1/1)
> switched to FAILED
> com.esotericsoftware.kryo.KryoException: Buffer underflow
>         at
>
>
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
>         at
com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>         at
>
>
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
>         at
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at
>
>
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
>         at
>
>
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
>         at
>
>
org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
>         at
>
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>         at
>
>
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>         at
>
>
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
>         at
>
>
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
>         at
>
>
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>         at
>
>
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
>         at java.lang.Thread.run(Thread.java:745)
>
> 02/11/2015 14:58:27     Job execution switched to status
FAILING.
> 02/11/2015 14:58:27     CHAIN Map (Map at
main(DoTheKMeans.java:64)) ->
> Map (Map
> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
> 02/11/2015 14:58:27     Combine (Reduce at
main(DoTheKMeans.java:68))(1/1)
> switched to CANCELING
> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
main(DoTheKMeans.java:68))
> -> Map
> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)
switched to
> CANCELED
> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
Iteration))(1/1) switched
> to
> CANCELING
> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
Iteration))(1/1) switched
> to
> CANCELED
> 02/11/2015 14:58:27     CHAIN Map (Map at
main(DoTheKMeans.java:64)) ->
> Map (Map
> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
> 02/11/2015 14:58:27     Combine (Reduce at
main(DoTheKMeans.java:68))(1/1)
> switched to CANCELED
> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
main(DoTheKMeans.java:68))
> -> Map
> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
> 02/11/2015 14:58:27     Job execution switched to status FAILED.
> Exception in thread "main"
> org.apache.flink.runtime.client.JobExecutionException:
> com.esotericsoftware.kryo.KryoException: Buffer underflow
>         at
>
>
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
>         at
com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>         at
>
>
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
>         at
com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>         at
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>         at
>
>
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
>         at
>
>
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
>         at
>
>
org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
>         at
>
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>         at
>
>
org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>         at
>
>
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
>         at
>
>
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
>         at
>
>
org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>         at
>
>
org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
>         at java.lang.Thread.run(Thread.java:745)
>
>         at
>
>
org.apache.flink.runtime.client.JobClientListener$$anonfun$receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
>         at
>
>
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>         at
>
>
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>         at
>
>
scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>         at
>
>
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
>         at
>
>
org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
>         at
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>         at
>
>
org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
>         at
akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>         at
>
>
org.apache.flink.runtime.client.JobClientListener.aroundReceive(JobClient.scala:74)
>         at
akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>         at
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>         at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
>
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
>
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
>
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> The centroid25 data is exactly the same in both cases. Could you
help me

> retrace what is wrong?
>
> Thanks and best regards,
>
> Tran Nam-Luc
>
>
>
> --
> View this message in context:
>
http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
> Sent from the Apache Flink (Incubator) Mailing List archive. mailing
list
> archive at Nabble.com.
>


Reply | Threaded
Open this post in threaded view
|

Re: kryoException : Buffer underflow

Timo Walther-2
Hey Nam-Luc,

I think your problem lies in the following line:

.returns("eu.euranova.flink.Centroid25")

If you do not specify the fields of the class in the String by using
"<myfield=String,otherField=int>", the underlying parser will create an
"GenericTypeInfo" type information which then uses Kryo for serialization.

In general, lambda expressions are a very new feature which currently
makes a lot of problems due to missing type information by compilers.
Maybe it is better to use (anonymous) classes instead.

In case of "map()" functions you don't need to provide type hints
through the "returns()" method.

For other operators you need to either specify all fields of the class
in the String (makes no sense in you case) or you change the method to

.returns(Centroid25.class)

I hope that helps.

Regards,
Timo

On 11.02.2015 17:38, Nam-Luc Tran wrote:

> Hello Stephan,
>
> Thank you for your help.
>
> I ensured all the POJO classes used comply to what you previously said
> and the same exception occurs. Here is the listing of classes
> Centroid25 and Point25:
>
> public class Centroid25 extends Point25 {
>
> public int id;
>
> public Centroid25() {}
>
> public Centroid25(int id, Double value0, Double value1, Double value2,
> Double value3, Double value4, Double value5,
> Double value6, Double value7, Double value8, Double value9, Double
> value10, Double value11, Double value12,
> Double value13, Double value14, Double value15, Double value16, Double
> value17, Double value18,
> Double value19, Double value20, Double value21, Double value22, Double
> value23, Double value24) {
> super(value0, value1, value2, value3, value4, value5, value6, value7,
> value8, value9, value10, value11,
> value12, value13, value14, value15, value16, value17, value18,
> value19, value20, value21, value22,
> value23, value24);
> this.id = id;
> }
>
> public Centroid25(int id, Point25 p) {
> super(p.f0,
> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
> this.id = id;
> }
>
> public Centroid25(int id, Tuple25 p) {
> super(p.f0,
> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
> this.id = id;
> }
>
> @Override
> public String toString() {
> return id + " " + super.toString();
> }
> }
>
> public class Point25{
>
> public Double
> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,f17,f18,f19,f20,f21,f22,f23,f24
> = 0.0;
>
> public Point25() {
> }
>
> public Point25(Double value0, Double value1, Double value2, Double
> value3, Double value4, Double value5,
> Double value6, Double value7, Double value8, Double value9, Double
> value10, Double value11, Double value12,
> Double value13, Double value14, Double value15, Double value16, Double
> value17, Double value18,
> Double value19, Double value20, Double value21, Double value22, Double
> value23, Double value24) {
> f0=value0;
> f1=value1;
> f2=value2;
> f3=value3;
> f4=value4;
> f5=value5;
> f6=value6;
> f7=value7;
> f8=value8;
> f9=value9;
> f10=value10;
> f11=value11;
> f12=value12;
> f13=value13;
> f14=value14;
> f15=value15;
> f16=value16;
> f17=value17;
> f18=value18;
> f19=value19;
> f20=value20;
> f21=value21;
> f22=value22;
> f23=value23;
> f24=value24;
>
> }
>
> public List getFieldsAsList() {
> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11,
> f12, f13, f14, f15, f16, f17, f18, f19,
> f20, f21, f22, f23, f24);
> }
>
> public Point25 add(Point25 other) {
> f0 += other.f0;
> f1 += other.f1;
> f2 += other.f2;
> f3 += other.f3;
> f4 += other.f4;
> f5 += other.f5;
> f6 += other.f6;
> f7 += other.f7;
> f8 += other.f8;
> f9 += other.f9;
> f10 += other.f10;
> f11 += other.f11;
> f12 += other.f12;
> f13 += other.f13;
> f14 += other.f14;
> f15 += other.f15;
> f16 += other.f16;
> f17 += other.f17;
> f18 += other.f18;
> f19 += other.f19;
> f20 += other.f20;
> f21 += other.f21;
> f22 += other.f22;
> f23 += other.f23;
> f24 += other.f24;
> return this;
> }
>
> public Point25 div(long val) {
> f0 /= val;
> f1 /= val;
> f2 /= val;
> f3 /= val;
> f4 /= val;
> f5 += val;
> f6 += val;
> f7 += val;
> f8 += val;
> f9 += val;
> f10 += val;
> f11 += val;
> f12 += val;
> f13 += val;
> f14 += val;
> f15 += val;
> f16 += val;
> f17 += val;
> f18 += val;
> f19 += val;
> f20 += val;
> f21 += val;
> f22 += val;
> f23 += val;
> f24 += val;
> return this;
> }
>
> public double euclideanDistance(Point25 other) {
> List l = this.getFieldsAsList();
> List ol = other.getFieldsAsList();
> double res = 0;
> for(int i=0;i
>> I came accross an error for which I am unable to retrace the exact
> cause.
>> Starting from flink-java-examples module, I have extended the KMeans
>> example
>> to a case where points have 25 coordinates. It follows the exact
> same
>> structure and transformations as the original example, only with
> points
>> having 25 coordinates instead of 2.
>>
>> When creating the centroids dataset within the code as follows the
> job
>> iterates and executes well:
>>
>> Centroid25 cent1 = new
> Centroid25(ThreadLocalRandom.current().nextInt(0,
>> 1000),
>>
>>
> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
>> Centroid25 cent2 = new
> Centroid25(ThreadLocalRandom.current().nextInt(0,
>> 1000),
>>
>>
> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
>> DataSet centroids = env.fromCollection(Arrays.asList(cent1,
>> cent2));
>>
>> When reading from a csv file containing the following:
>>
>>
> -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
>>
> -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
>> with the following code:
>> DataSet> centroids = env
>>
>> .readCsvFile("file:///home/nltran/res3.csv")
>>                                  
> .fieldDelimiter(",")
>>                                  
> .includeFields("1111111111111111111111111")
>>                                  
> .types(Double.class, Double.class,
>> Double.class, Double.class,
>> Double.class, Double.class,
>>                                                  
> Double.class,
>> Double.class, Double.class, Double.class, Double.class,
>> Double.class,
>>                                                  
> Double.class,
>> Double.class, Double.class, Double.class, Double.class,
>> Double.class,
>>                                                  
> Double.class,
>> Double.class, Double.class, Double.class, Double.class,
>> Double.class,
>>                                                  
> Double.class).map(p -> {
>>                                          
> return new
>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
>>
>>
> p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.f22,p.f23,p.f24);
>>                                  
> }).returns("eu.euranova.flink.Centroid25");
>>
>> I hit the following exception:
>>
>> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
>> Iteration))(1/1)
>> switched to FAILED
>> com.esotericsoftware.kryo.KryoException: Buffer underflow
>>           at
>>
>>
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
>>           at
> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>>           at
>>
>>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
>>           at
> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>           at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>           at
>>
>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
>>           at
>>
>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
>>           at
>>
>>
> org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
>>           at
>>
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>>           at
>>
>>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>>           at
>>
>>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
>>           at
>>
>>
> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
>>           at
>>
>>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>>           at
>>
>>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
>>           at java.lang.Thread.run(Thread.java:745)
>>
>> 02/11/2015 14:58:27     Job execution switched to status
> FAILING.
>> 02/11/2015 14:58:27     CHAIN Map (Map at
> main(DoTheKMeans.java:64)) ->
>> Map (Map
>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
>> 02/11/2015 14:58:27     Combine (Reduce at
> main(DoTheKMeans.java:68))(1/1)
>> switched to CANCELING
>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
> main(DoTheKMeans.java:68))
>> -> Map
>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
>> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)
> switched to
>> CANCELED
>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
> Iteration))(1/1) switched
>> to
>> CANCELING
>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
> Iteration))(1/1) switched
>> to
>> CANCELED
>> 02/11/2015 14:58:27     CHAIN Map (Map at
> main(DoTheKMeans.java:64)) ->
>> Map (Map
>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
>> 02/11/2015 14:58:27     Combine (Reduce at
> main(DoTheKMeans.java:68))(1/1)
>> switched to CANCELED
>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
> main(DoTheKMeans.java:68))
>> -> Map
>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
>> 02/11/2015 14:58:27     Job execution switched to status FAILED.
>> Exception in thread "main"
>> org.apache.flink.runtime.client.JobExecutionException:
>> com.esotericsoftware.kryo.KryoException: Buffer underflow
>>           at
>>
>>
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(NoFetchingInput.java:76)
>>           at
> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>>           at
>>
>>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:109)
>>           at
> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>           at
> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>           at
>>
>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:205)
>>           at
>>
>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(KryoSerializer.java:210)
>>           at
>>
>>
> org.apache.flink.runtime.io.disk.InputViewIterator.next(InputViewIterator.java:43)
>>           at
>>
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>>           at
>>
>>
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496)
>>           at
>>
>>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(AbstractIterativePactTask.java:138)
>>           at
>>
>>
> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(IterationHeadPactTask.java:324)
>>           at
>>
>>
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:360)
>>           at
>>
>>
> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:204)
>>           at java.lang.Thread.run(Thread.java:745)
>>
>>           at
>>
>>
> org.apache.flink.runtime.client.JobClientListener$$anonfun$receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
>>           at
>>
>>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>>           at
>>
>>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>>           at
>>
>>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>>           at
>>
>>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:37)
>>           at
>>
>>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:30)
>>           at
>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>           at
>>
>>
> org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:30)
>>           at
> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>           at
>>
>>
> org.apache.flink.runtime.client.JobClientListener.aroundReceive(JobClient.scala:74)
>>           at
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>           at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>           at
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>           at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>           at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>           at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>           at
>>
>>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>           at
>>
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>           at
>>
>>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> The centroid25 data is exactly the same in both cases. Could you
> help me
>> retrace what is wrong?
>>
>> Thanks and best regards,
>>
>> Tran Nam-Luc
>>
>>
>>
>> --
>> View this message in context:
>>
> http://apache-flink-incubator-mailing-list-archive.1008284.n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
>> Sent from the Apache Flink (Incubator) Mailing List archive. mailing
> list
>> archive at Nabble.com.
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: kryoException : Buffer underflow

Stephan Ewen
@Timo If I understand it correctly, both omitting the "returns(...)"
statement, or changing it to "returns(Centroid25.class)" would help?

I think that the behavior between "returns(Centroid25.class)" and "
returns("eu.euranova.flink.Centroid25")" should be consistent in that they
both handle the type as a POJO.

Stephan


On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther <[hidden email]> wrote:

> Hey Nam-Luc,
>
> I think your problem lies in the following line:
>
> .returns("eu.euranova.flink.Centroid25")
>
> If you do not specify the fields of the class in the String by using
> "<myfield=String,otherField=int>", the underlying parser will create an
> "GenericTypeInfo" type information which then uses Kryo for serialization.
>
> In general, lambda expressions are a very new feature which currently
> makes a lot of problems due to missing type information by compilers. Maybe
> it is better to use (anonymous) classes instead.
>
> In case of "map()" functions you don't need to provide type hints through
> the "returns()" method.
>
> For other operators you need to either specify all fields of the class in
> the String (makes no sense in you case) or you change the method to
>
> .returns(Centroid25.class)
>
> I hope that helps.
>
> Regards,
> Timo
>
>
> On 11.02.2015 17:38, Nam-Luc Tran wrote:
>
>> Hello Stephan,
>>
>> Thank you for your help.
>>
>> I ensured all the POJO classes used comply to what you previously said
>> and the same exception occurs. Here is the listing of classes
>> Centroid25 and Point25:
>>
>> public class Centroid25 extends Point25 {
>>
>> public int id;
>>
>> public Centroid25() {}
>>
>> public Centroid25(int id, Double value0, Double value1, Double value2,
>> Double value3, Double value4, Double value5,
>> Double value6, Double value7, Double value8, Double value9, Double
>> value10, Double value11, Double value12,
>> Double value13, Double value14, Double value15, Double value16, Double
>> value17, Double value18,
>> Double value19, Double value20, Double value21, Double value22, Double
>> value23, Double value24) {
>> super(value0, value1, value2, value3, value4, value5, value6, value7,
>> value8, value9, value10, value11,
>> value12, value13, value14, value15, value16, value17, value18,
>> value19, value20, value21, value22,
>> value23, value24);
>> this.id = id;
>> }
>>
>> public Centroid25(int id, Point25 p) {
>> super(p.f0,
>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
>> f22,p.f23,p.f24);
>> this.id = id;
>> }
>>
>> public Centroid25(int id, Tuple25 p) {
>> super(p.f0,
>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
>> f22,p.f23,p.f24);
>> this.id = id;
>> }
>>
>> @Override
>> public String toString() {
>> return id + " " + super.toString();
>> }
>> }
>>
>> public class Point25{
>>
>> public Double
>> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,
>> f17,f18,f19,f20,f21,f22,f23,f24
>> = 0.0;
>>
>> public Point25() {
>> }
>>
>> public Point25(Double value0, Double value1, Double value2, Double
>> value3, Double value4, Double value5,
>> Double value6, Double value7, Double value8, Double value9, Double
>> value10, Double value11, Double value12,
>> Double value13, Double value14, Double value15, Double value16, Double
>> value17, Double value18,
>> Double value19, Double value20, Double value21, Double value22, Double
>> value23, Double value24) {
>> f0=value0;
>> f1=value1;
>> f2=value2;
>> f3=value3;
>> f4=value4;
>> f5=value5;
>> f6=value6;
>> f7=value7;
>> f8=value8;
>> f9=value9;
>> f10=value10;
>> f11=value11;
>> f12=value12;
>> f13=value13;
>> f14=value14;
>> f15=value15;
>> f16=value16;
>> f17=value17;
>> f18=value18;
>> f19=value19;
>> f20=value20;
>> f21=value21;
>> f22=value22;
>> f23=value23;
>> f24=value24;
>>
>> }
>>
>> public List getFieldsAsList() {
>> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11,
>> f12, f13, f14, f15, f16, f17, f18, f19,
>> f20, f21, f22, f23, f24);
>> }
>>
>> public Point25 add(Point25 other) {
>> f0 += other.f0;
>> f1 += other.f1;
>> f2 += other.f2;
>> f3 += other.f3;
>> f4 += other.f4;
>> f5 += other.f5;
>> f6 += other.f6;
>> f7 += other.f7;
>> f8 += other.f8;
>> f9 += other.f9;
>> f10 += other.f10;
>> f11 += other.f11;
>> f12 += other.f12;
>> f13 += other.f13;
>> f14 += other.f14;
>> f15 += other.f15;
>> f16 += other.f16;
>> f17 += other.f17;
>> f18 += other.f18;
>> f19 += other.f19;
>> f20 += other.f20;
>> f21 += other.f21;
>> f22 += other.f22;
>> f23 += other.f23;
>> f24 += other.f24;
>> return this;
>> }
>>
>> public Point25 div(long val) {
>> f0 /= val;
>> f1 /= val;
>> f2 /= val;
>> f3 /= val;
>> f4 /= val;
>> f5 += val;
>> f6 += val;
>> f7 += val;
>> f8 += val;
>> f9 += val;
>> f10 += val;
>> f11 += val;
>> f12 += val;
>> f13 += val;
>> f14 += val;
>> f15 += val;
>> f16 += val;
>> f17 += val;
>> f18 += val;
>> f19 += val;
>> f20 += val;
>> f21 += val;
>> f22 += val;
>> f23 += val;
>> f24 += val;
>> return this;
>> }
>>
>> public double euclideanDistance(Point25 other) {
>> List l = this.getFieldsAsList();
>> List ol = other.getFieldsAsList();
>> double res = 0;
>> for(int i=0;i
>>
>>> I came accross an error for which I am unable to retrace the exact
>>>
>> cause.
>>
>>> Starting from flink-java-examples module, I have extended the KMeans
>>> example
>>> to a case where points have 25 coordinates. It follows the exact
>>>
>> same
>>
>>> structure and transformations as the original example, only with
>>>
>> points
>>
>>> having 25 coordinates instead of 2.
>>>
>>> When creating the centroids dataset within the code as follows the
>>>
>> job
>>
>>> iterates and executes well:
>>>
>>> Centroid25 cent1 = new
>>>
>> Centroid25(ThreadLocalRandom.current().nextInt(0,
>>
>>> 1000),
>>>
>>>
>>>  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
>>
>>> Centroid25 cent2 = new
>>>
>> Centroid25(ThreadLocalRandom.current().nextInt(0,
>>
>>> 1000),
>>>
>>>
>>>  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
>>
>>> DataSet centroids = env.fromCollection(Arrays.asList(cent1,
>>> cent2));
>>>
>>> When reading from a csv file containing the following:
>>>
>>>
>>>  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
>>
>>>
>>>  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
>>
>>> with the following code:
>>> DataSet> centroids = env
>>>
>>> .readCsvFile("file:///home/nltran/res3.csv")
>>>
>>>
>> .fieldDelimiter(",")
>>
>>>
>>>
>> .includeFields("1111111111111111111111111")
>>
>>>
>>>
>> .types(Double.class, Double.class,
>>
>>> Double.class, Double.class,
>>> Double.class, Double.class,
>>>
>>>
>> Double.class,
>>
>>> Double.class, Double.class, Double.class, Double.class,
>>> Double.class,
>>>
>>>
>> Double.class,
>>
>>> Double.class, Double.class, Double.class, Double.class,
>>> Double.class,
>>>
>>>
>> Double.class,
>>
>>> Double.class, Double.class, Double.class, Double.class,
>>> Double.class,
>>>
>>>
>> Double.class).map(p -> {
>>
>>>
>>>
>> return new
>>
>>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
>>>
>>>
>>>  p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.
>> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.
>> f21,p.f22,p.f23,p.f24);
>>
>>>
>>>
>> }).returns("eu.euranova.flink.Centroid25");
>>
>>>
>>> I hit the following exception:
>>>
>>> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
>>> Iteration))(1/1)
>>> switched to FAILED
>>> com.esotericsoftware.kryo.KryoException: Buffer underflow
>>>           at
>>>
>>>
>>>  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
>> NoFetchingInput.java:76)
>>
>>>           at
>>>
>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>>
>>>           at
>>>
>>>
>>>  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
>> DefaultClassResolver.java:109)
>>
>>>           at
>>>
>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>
>>>           at
>>>
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>> KryoSerializer.java:205)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>> KryoSerializer.java:210)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.io.disk.InputViewIterator.next(
>> InputViewIterator.java:43)
>>
>>>           at
>>>
>>>  org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.operators.RegularPactTask.run(
>> RegularPactTask.java:496)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
>> AbstractIterativePactTask.java:138)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
>> IterationHeadPactTask.java:324)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.operators.RegularPactTask.
>> invoke(RegularPactTask.java:360)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.execution.RuntimeEnvironment.
>> run(RuntimeEnvironment.java:204)
>>
>>>           at java.lang.Thread.run(Thread.java:745)
>>>
>>> 02/11/2015 14:58:27     Job execution switched to status
>>>
>> FAILING.
>>
>>> 02/11/2015 14:58:27     CHAIN Map (Map at
>>>
>> main(DoTheKMeans.java:64)) ->
>>
>>> Map (Map
>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
>>> 02/11/2015 14:58:27     Combine (Reduce at
>>>
>> main(DoTheKMeans.java:68))(1/1)
>>
>>> switched to CANCELING
>>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
>>>
>> main(DoTheKMeans.java:68))
>>
>>> -> Map
>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
>>> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)
>>>
>> switched to
>>
>>> CANCELED
>>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
>>>
>> Iteration))(1/1) switched
>>
>>> to
>>> CANCELING
>>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
>>>
>> Iteration))(1/1) switched
>>
>>> to
>>> CANCELED
>>> 02/11/2015 14:58:27     CHAIN Map (Map at
>>>
>> main(DoTheKMeans.java:64)) ->
>>
>>> Map (Map
>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
>>> 02/11/2015 14:58:27     Combine (Reduce at
>>>
>> main(DoTheKMeans.java:68))(1/1)
>>
>>> switched to CANCELED
>>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
>>>
>> main(DoTheKMeans.java:68))
>>
>>> -> Map
>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
>>> 02/11/2015 14:58:27     Job execution switched to status FAILED.
>>> Exception in thread "main"
>>> org.apache.flink.runtime.client.JobExecutionException:
>>> com.esotericsoftware.kryo.KryoException: Buffer underflow
>>>           at
>>>
>>>
>>>  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
>> NoFetchingInput.java:76)
>>
>>>           at
>>>
>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>>
>>>           at
>>>
>>>
>>>  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
>> DefaultClassResolver.java:109)
>>
>>>           at
>>>
>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>
>>>           at
>>>
>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>> KryoSerializer.java:205)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>> KryoSerializer.java:210)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.io.disk.InputViewIterator.next(
>> InputViewIterator.java:43)
>>
>>>           at
>>>
>>>  org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.operators.RegularPactTask.run(
>> RegularPactTask.java:496)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
>> AbstractIterativePactTask.java:138)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
>> IterationHeadPactTask.java:324)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.operators.RegularPactTask.
>> invoke(RegularPactTask.java:360)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.execution.RuntimeEnvironment.
>> run(RuntimeEnvironment.java:204)
>>
>>>           at java.lang.Thread.run(Thread.java:745)
>>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.client.JobClientListener$$anonfun$
>> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
>>
>>>           at
>>>
>>>
>>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
>> AbstractPartialFunction.scala:33)
>>
>>>           at
>>>
>>>
>>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
>> AbstractPartialFunction.scala:33)
>>
>>>           at
>>>
>>>
>>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
>> AbstractPartialFunction.scala:25)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
>> apply(ActorLogMessages.scala:37)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
>> apply(ActorLogMessages.scala:30)
>>
>>>           at
>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
>> applyOrElse(ActorLogMessages.scala:30)
>>
>>>           at
>>>
>> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>
>>>           at
>>>
>>>
>>>  org.apache.flink.runtime.client.JobClientListener.
>> aroundReceive(JobClient.scala:74)
>>
>>>           at
>>>
>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>
>>>           at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>           at
>>>
>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>
>>>           at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>           at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>           at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>           at
>>>
>>>
>>>  scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>> runTask(ForkJoinPool.java:1339)
>>
>>>           at
>>>
>>>  scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>> ForkJoinPool.java:1979)
>>
>>>           at
>>>
>>>
>>>  scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>> ForkJoinWorkerThread.java:107)
>>
>>> The centroid25 data is exactly the same in both cases. Could you
>>>
>> help me
>>
>>> retrace what is wrong?
>>>
>>> Thanks and best regards,
>>>
>>> Tran Nam-Luc
>>>
>>>
>>>
>>> --
>>> View this message in context:
>>>
>>>  http://apache-flink-incubator-mailing-list-archive.1008284.
>> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
>>
>>> Sent from the Apache Flink (Incubator) Mailing List archive. mailing
>>>
>> list
>>
>>> archive at Nabble.com.
>>>
>>>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: kryoException : Buffer underflow

Robert Metzger
I think the issue is that the returns("eu.euranova.flink.Centroid25")
variant only passes a string and the system does not know the
typeparameters.
So we have to put GenericTypeInfo there, because we basically see Object's.

On Wed, Feb 11, 2015 at 9:37 PM, Stephan Ewen <[hidden email]> wrote:

> @Timo If I understand it correctly, both omitting the "returns(...)"
> statement, or changing it to "returns(Centroid25.class)" would help?
>
> I think that the behavior between "returns(Centroid25.class)" and "
> returns("eu.euranova.flink.Centroid25")" should be consistent in that they
> both handle the type as a POJO.
>
> Stephan
>
>
> On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther <[hidden email]> wrote:
>
> > Hey Nam-Luc,
> >
> > I think your problem lies in the following line:
> >
> > .returns("eu.euranova.flink.Centroid25")
> >
> > If you do not specify the fields of the class in the String by using
> > "<myfield=String,otherField=int>", the underlying parser will create an
> > "GenericTypeInfo" type information which then uses Kryo for
> serialization.
> >
> > In general, lambda expressions are a very new feature which currently
> > makes a lot of problems due to missing type information by compilers.
> Maybe
> > it is better to use (anonymous) classes instead.
> >
> > In case of "map()" functions you don't need to provide type hints through
> > the "returns()" method.
> >
> > For other operators you need to either specify all fields of the class in
> > the String (makes no sense in you case) or you change the method to
> >
> > .returns(Centroid25.class)
> >
> > I hope that helps.
> >
> > Regards,
> > Timo
> >
> >
> > On 11.02.2015 17:38, Nam-Luc Tran wrote:
> >
> >> Hello Stephan,
> >>
> >> Thank you for your help.
> >>
> >> I ensured all the POJO classes used comply to what you previously said
> >> and the same exception occurs. Here is the listing of classes
> >> Centroid25 and Point25:
> >>
> >> public class Centroid25 extends Point25 {
> >>
> >> public int id;
> >>
> >> public Centroid25() {}
> >>
> >> public Centroid25(int id, Double value0, Double value1, Double value2,
> >> Double value3, Double value4, Double value5,
> >> Double value6, Double value7, Double value8, Double value9, Double
> >> value10, Double value11, Double value12,
> >> Double value13, Double value14, Double value15, Double value16, Double
> >> value17, Double value18,
> >> Double value19, Double value20, Double value21, Double value22, Double
> >> value23, Double value24) {
> >> super(value0, value1, value2, value3, value4, value5, value6, value7,
> >> value8, value9, value10, value11,
> >> value12, value13, value14, value15, value16, value17, value18,
> >> value19, value20, value21, value22,
> >> value23, value24);
> >> this.id = id;
> >> }
> >>
> >> public Centroid25(int id, Point25 p) {
> >> super(p.f0,
> >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
> >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
> >> f22,p.f23,p.f24);
> >> this.id = id;
> >> }
> >>
> >> public Centroid25(int id, Tuple25 p) {
> >> super(p.f0,
> >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
> >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
> >> f22,p.f23,p.f24);
> >> this.id = id;
> >> }
> >>
> >> @Override
> >> public String toString() {
> >> return id + " " + super.toString();
> >> }
> >> }
> >>
> >> public class Point25{
> >>
> >> public Double
> >> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,
> >> f17,f18,f19,f20,f21,f22,f23,f24
> >> = 0.0;
> >>
> >> public Point25() {
> >> }
> >>
> >> public Point25(Double value0, Double value1, Double value2, Double
> >> value3, Double value4, Double value5,
> >> Double value6, Double value7, Double value8, Double value9, Double
> >> value10, Double value11, Double value12,
> >> Double value13, Double value14, Double value15, Double value16, Double
> >> value17, Double value18,
> >> Double value19, Double value20, Double value21, Double value22, Double
> >> value23, Double value24) {
> >> f0=value0;
> >> f1=value1;
> >> f2=value2;
> >> f3=value3;
> >> f4=value4;
> >> f5=value5;
> >> f6=value6;
> >> f7=value7;
> >> f8=value8;
> >> f9=value9;
> >> f10=value10;
> >> f11=value11;
> >> f12=value12;
> >> f13=value13;
> >> f14=value14;
> >> f15=value15;
> >> f16=value16;
> >> f17=value17;
> >> f18=value18;
> >> f19=value19;
> >> f20=value20;
> >> f21=value21;
> >> f22=value22;
> >> f23=value23;
> >> f24=value24;
> >>
> >> }
> >>
> >> public List getFieldsAsList() {
> >> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11,
> >> f12, f13, f14, f15, f16, f17, f18, f19,
> >> f20, f21, f22, f23, f24);
> >> }
> >>
> >> public Point25 add(Point25 other) {
> >> f0 += other.f0;
> >> f1 += other.f1;
> >> f2 += other.f2;
> >> f3 += other.f3;
> >> f4 += other.f4;
> >> f5 += other.f5;
> >> f6 += other.f6;
> >> f7 += other.f7;
> >> f8 += other.f8;
> >> f9 += other.f9;
> >> f10 += other.f10;
> >> f11 += other.f11;
> >> f12 += other.f12;
> >> f13 += other.f13;
> >> f14 += other.f14;
> >> f15 += other.f15;
> >> f16 += other.f16;
> >> f17 += other.f17;
> >> f18 += other.f18;
> >> f19 += other.f19;
> >> f20 += other.f20;
> >> f21 += other.f21;
> >> f22 += other.f22;
> >> f23 += other.f23;
> >> f24 += other.f24;
> >> return this;
> >> }
> >>
> >> public Point25 div(long val) {
> >> f0 /= val;
> >> f1 /= val;
> >> f2 /= val;
> >> f3 /= val;
> >> f4 /= val;
> >> f5 += val;
> >> f6 += val;
> >> f7 += val;
> >> f8 += val;
> >> f9 += val;
> >> f10 += val;
> >> f11 += val;
> >> f12 += val;
> >> f13 += val;
> >> f14 += val;
> >> f15 += val;
> >> f16 += val;
> >> f17 += val;
> >> f18 += val;
> >> f19 += val;
> >> f20 += val;
> >> f21 += val;
> >> f22 += val;
> >> f23 += val;
> >> f24 += val;
> >> return this;
> >> }
> >>
> >> public double euclideanDistance(Point25 other) {
> >> List l = this.getFieldsAsList();
> >> List ol = other.getFieldsAsList();
> >> double res = 0;
> >> for(int i=0;i
> >>
> >>> I came accross an error for which I am unable to retrace the exact
> >>>
> >> cause.
> >>
> >>> Starting from flink-java-examples module, I have extended the KMeans
> >>> example
> >>> to a case where points have 25 coordinates. It follows the exact
> >>>
> >> same
> >>
> >>> structure and transformations as the original example, only with
> >>>
> >> points
> >>
> >>> having 25 coordinates instead of 2.
> >>>
> >>> When creating the centroids dataset within the code as follows the
> >>>
> >> job
> >>
> >>> iterates and executes well:
> >>>
> >>> Centroid25 cent1 = new
> >>>
> >> Centroid25(ThreadLocalRandom.current().nextInt(0,
> >>
> >>> 1000),
> >>>
> >>>
> >>>  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
> >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
> >>
> >>> Centroid25 cent2 = new
> >>>
> >> Centroid25(ThreadLocalRandom.current().nextInt(0,
> >>
> >>> 1000),
> >>>
> >>>
> >>>  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
> >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
> >>
> >>> DataSet centroids = env.fromCollection(Arrays.asList(cent1,
> >>> cent2));
> >>>
> >>> When reading from a csv file containing the following:
> >>>
> >>>
> >>>  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
> >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
> >>
> >>>
> >>>  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
> >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
> >>
> >>> with the following code:
> >>> DataSet> centroids = env
> >>>
> >>> .readCsvFile("file:///home/nltran/res3.csv")
> >>>
> >>>
> >> .fieldDelimiter(",")
> >>
> >>>
> >>>
> >> .includeFields("1111111111111111111111111")
> >>
> >>>
> >>>
> >> .types(Double.class, Double.class,
> >>
> >>> Double.class, Double.class,
> >>> Double.class, Double.class,
> >>>
> >>>
> >> Double.class,
> >>
> >>> Double.class, Double.class, Double.class, Double.class,
> >>> Double.class,
> >>>
> >>>
> >> Double.class,
> >>
> >>> Double.class, Double.class, Double.class, Double.class,
> >>> Double.class,
> >>>
> >>>
> >> Double.class,
> >>
> >>> Double.class, Double.class, Double.class, Double.class,
> >>> Double.class,
> >>>
> >>>
> >> Double.class).map(p -> {
> >>
> >>>
> >>>
> >> return new
> >>
> >>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
> >>>
> >>>
> >>>  p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.
> >> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.
> >> f21,p.f22,p.f23,p.f24);
> >>
> >>>
> >>>
> >> }).returns("eu.euranova.flink.Centroid25");
> >>
> >>>
> >>> I hit the following exception:
> >>>
> >>> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
> >>> Iteration))(1/1)
> >>> switched to FAILED
> >>> com.esotericsoftware.kryo.KryoException: Buffer underflow
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
> >> NoFetchingInput.java:76)
> >>
> >>>           at
> >>>
> >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> >>
> >>>           at
> >>>
> >>>
> >>>  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> >> DefaultClassResolver.java:109)
> >>
> >>>           at
> >>>
> >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> >>
> >>>           at
> >>>
> >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> >>
> >>>           at
> >>>
> >>>
> >>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> >> KryoSerializer.java:205)
> >>
> >>>           at
> >>>
> >>>
> >>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> >> KryoSerializer.java:210)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.io.disk.InputViewIterator.next(
> >> InputViewIterator.java:43)
> >>
> >>>           at
> >>>
> >>>  org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.operators.RegularPactTask.run(
> >> RegularPactTask.java:496)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
> >> AbstractIterativePactTask.java:138)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
> >> IterationHeadPactTask.java:324)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.operators.RegularPactTask.
> >> invoke(RegularPactTask.java:360)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.execution.RuntimeEnvironment.
> >> run(RuntimeEnvironment.java:204)
> >>
> >>>           at java.lang.Thread.run(Thread.java:745)
> >>>
> >>> 02/11/2015 14:58:27     Job execution switched to status
> >>>
> >> FAILING.
> >>
> >>> 02/11/2015 14:58:27     CHAIN Map (Map at
> >>>
> >> main(DoTheKMeans.java:64)) ->
> >>
> >>> Map (Map
> >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
> >>> 02/11/2015 14:58:27     Combine (Reduce at
> >>>
> >> main(DoTheKMeans.java:68))(1/1)
> >>
> >>> switched to CANCELING
> >>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
> >>>
> >> main(DoTheKMeans.java:68))
> >>
> >>> -> Map
> >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
> >>> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)
> >>>
> >> switched to
> >>
> >>> CANCELED
> >>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
> >>>
> >> Iteration))(1/1) switched
> >>
> >>> to
> >>> CANCELING
> >>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
> >>>
> >> Iteration))(1/1) switched
> >>
> >>> to
> >>> CANCELED
> >>> 02/11/2015 14:58:27     CHAIN Map (Map at
> >>>
> >> main(DoTheKMeans.java:64)) ->
> >>
> >>> Map (Map
> >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
> >>> 02/11/2015 14:58:27     Combine (Reduce at
> >>>
> >> main(DoTheKMeans.java:68))(1/1)
> >>
> >>> switched to CANCELED
> >>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
> >>>
> >> main(DoTheKMeans.java:68))
> >>
> >>> -> Map
> >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
> >>> 02/11/2015 14:58:27     Job execution switched to status FAILED.
> >>> Exception in thread "main"
> >>> org.apache.flink.runtime.client.JobExecutionException:
> >>> com.esotericsoftware.kryo.KryoException: Buffer underflow
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
> >> NoFetchingInput.java:76)
> >>
> >>>           at
> >>>
> >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> >>
> >>>           at
> >>>
> >>>
> >>>  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> >> DefaultClassResolver.java:109)
> >>
> >>>           at
> >>>
> >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> >>
> >>>           at
> >>>
> >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> >>
> >>>           at
> >>>
> >>>
> >>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> >> KryoSerializer.java:205)
> >>
> >>>           at
> >>>
> >>>
> >>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> >> KryoSerializer.java:210)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.io.disk.InputViewIterator.next(
> >> InputViewIterator.java:43)
> >>
> >>>           at
> >>>
> >>>  org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.operators.RegularPactTask.run(
> >> RegularPactTask.java:496)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
> >> AbstractIterativePactTask.java:138)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
> >> IterationHeadPactTask.java:324)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.operators.RegularPactTask.
> >> invoke(RegularPactTask.java:360)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.execution.RuntimeEnvironment.
> >> run(RuntimeEnvironment.java:204)
> >>
> >>>           at java.lang.Thread.run(Thread.java:745)
> >>>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.client.JobClientListener$$anonfun$
> >> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
> >>
> >>>           at
> >>>
> >>>
> >>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> >> AbstractPartialFunction.scala:33)
> >>
> >>>           at
> >>>
> >>>
> >>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> >> AbstractPartialFunction.scala:33)
> >>
> >>>           at
> >>>
> >>>
> >>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> >> AbstractPartialFunction.scala:25)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
> >> apply(ActorLogMessages.scala:37)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
> >> apply(ActorLogMessages.scala:30)
> >>
> >>>           at
> >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
> >> applyOrElse(ActorLogMessages.scala:30)
> >>
> >>>           at
> >>>
> >> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> >>
> >>>           at
> >>>
> >>>
> >>>  org.apache.flink.runtime.client.JobClientListener.
> >> aroundReceive(JobClient.scala:74)
> >>
> >>>           at
> >>>
> >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> >>
> >>>           at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> >>>           at
> >>>
> >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> >>
> >>>           at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> >>>           at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> >>>           at
> >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>>           at
> >>>
> >>>
> >>>  scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> >> runTask(ForkJoinPool.java:1339)
> >>
> >>>           at
> >>>
> >>>  scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> >> ForkJoinPool.java:1979)
> >>
> >>>           at
> >>>
> >>>
> >>>  scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> >> ForkJoinWorkerThread.java:107)
> >>
> >>> The centroid25 data is exactly the same in both cases. Could you
> >>>
> >> help me
> >>
> >>> retrace what is wrong?
> >>>
> >>> Thanks and best regards,
> >>>
> >>> Tran Nam-Luc
> >>>
> >>>
> >>>
> >>> --
> >>> View this message in context:
> >>>
> >>>  http://apache-flink-incubator-mailing-list-archive.1008284.
> >> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
> >>
> >>> Sent from the Apache Flink (Incubator) Mailing List archive. mailing
> >>>
> >> list
> >>
> >>> archive at Nabble.com.
> >>>
> >>>
> >>
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: kryoException : Buffer underflow

Stephan Ewen
But in this case, there are no type parameters, correct? Centroid25 is not
a generic class...

On Wed, Feb 11, 2015 at 9:40 PM, Robert Metzger <[hidden email]> wrote:

> I think the issue is that the returns("eu.euranova.flink.Centroid25")
> variant only passes a string and the system does not know the
> typeparameters.
> So we have to put GenericTypeInfo there, because we basically see Object's.
>
> On Wed, Feb 11, 2015 at 9:37 PM, Stephan Ewen <[hidden email]> wrote:
>
> > @Timo If I understand it correctly, both omitting the "returns(...)"
> > statement, or changing it to "returns(Centroid25.class)" would help?
> >
> > I think that the behavior between "returns(Centroid25.class)" and "
> > returns("eu.euranova.flink.Centroid25")" should be consistent in that
> they
> > both handle the type as a POJO.
> >
> > Stephan
> >
> >
> > On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther <[hidden email]>
> wrote:
> >
> > > Hey Nam-Luc,
> > >
> > > I think your problem lies in the following line:
> > >
> > > .returns("eu.euranova.flink.Centroid25")
> > >
> > > If you do not specify the fields of the class in the String by using
> > > "<myfield=String,otherField=int>", the underlying parser will create an
> > > "GenericTypeInfo" type information which then uses Kryo for
> > serialization.
> > >
> > > In general, lambda expressions are a very new feature which currently
> > > makes a lot of problems due to missing type information by compilers.
> > Maybe
> > > it is better to use (anonymous) classes instead.
> > >
> > > In case of "map()" functions you don't need to provide type hints
> through
> > > the "returns()" method.
> > >
> > > For other operators you need to either specify all fields of the class
> in
> > > the String (makes no sense in you case) or you change the method to
> > >
> > > .returns(Centroid25.class)
> > >
> > > I hope that helps.
> > >
> > > Regards,
> > > Timo
> > >
> > >
> > > On 11.02.2015 17:38, Nam-Luc Tran wrote:
> > >
> > >> Hello Stephan,
> > >>
> > >> Thank you for your help.
> > >>
> > >> I ensured all the POJO classes used comply to what you previously said
> > >> and the same exception occurs. Here is the listing of classes
> > >> Centroid25 and Point25:
> > >>
> > >> public class Centroid25 extends Point25 {
> > >>
> > >> public int id;
> > >>
> > >> public Centroid25() {}
> > >>
> > >> public Centroid25(int id, Double value0, Double value1, Double value2,
> > >> Double value3, Double value4, Double value5,
> > >> Double value6, Double value7, Double value8, Double value9, Double
> > >> value10, Double value11, Double value12,
> > >> Double value13, Double value14, Double value15, Double value16, Double
> > >> value17, Double value18,
> > >> Double value19, Double value20, Double value21, Double value22, Double
> > >> value23, Double value24) {
> > >> super(value0, value1, value2, value3, value4, value5, value6, value7,
> > >> value8, value9, value10, value11,
> > >> value12, value13, value14, value15, value16, value17, value18,
> > >> value19, value20, value21, value22,
> > >> value23, value24);
> > >> this.id = id;
> > >> }
> > >>
> > >> public Centroid25(int id, Point25 p) {
> > >> super(p.f0,
> > >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
> > >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
> > >> f22,p.f23,p.f24);
> > >> this.id = id;
> > >> }
> > >>
> > >> public Centroid25(int id, Tuple25 p) {
> > >> super(p.f0,
> > >> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
> > >> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
> > >> f22,p.f23,p.f24);
> > >> this.id = id;
> > >> }
> > >>
> > >> @Override
> > >> public String toString() {
> > >> return id + " " + super.toString();
> > >> }
> > >> }
> > >>
> > >> public class Point25{
> > >>
> > >> public Double
> > >> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,
> > >> f17,f18,f19,f20,f21,f22,f23,f24
> > >> = 0.0;
> > >>
> > >> public Point25() {
> > >> }
> > >>
> > >> public Point25(Double value0, Double value1, Double value2, Double
> > >> value3, Double value4, Double value5,
> > >> Double value6, Double value7, Double value8, Double value9, Double
> > >> value10, Double value11, Double value12,
> > >> Double value13, Double value14, Double value15, Double value16, Double
> > >> value17, Double value18,
> > >> Double value19, Double value20, Double value21, Double value22, Double
> > >> value23, Double value24) {
> > >> f0=value0;
> > >> f1=value1;
> > >> f2=value2;
> > >> f3=value3;
> > >> f4=value4;
> > >> f5=value5;
> > >> f6=value6;
> > >> f7=value7;
> > >> f8=value8;
> > >> f9=value9;
> > >> f10=value10;
> > >> f11=value11;
> > >> f12=value12;
> > >> f13=value13;
> > >> f14=value14;
> > >> f15=value15;
> > >> f16=value16;
> > >> f17=value17;
> > >> f18=value18;
> > >> f19=value19;
> > >> f20=value20;
> > >> f21=value21;
> > >> f22=value22;
> > >> f23=value23;
> > >> f24=value24;
> > >>
> > >> }
> > >>
> > >> public List getFieldsAsList() {
> > >> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11,
> > >> f12, f13, f14, f15, f16, f17, f18, f19,
> > >> f20, f21, f22, f23, f24);
> > >> }
> > >>
> > >> public Point25 add(Point25 other) {
> > >> f0 += other.f0;
> > >> f1 += other.f1;
> > >> f2 += other.f2;
> > >> f3 += other.f3;
> > >> f4 += other.f4;
> > >> f5 += other.f5;
> > >> f6 += other.f6;
> > >> f7 += other.f7;
> > >> f8 += other.f8;
> > >> f9 += other.f9;
> > >> f10 += other.f10;
> > >> f11 += other.f11;
> > >> f12 += other.f12;
> > >> f13 += other.f13;
> > >> f14 += other.f14;
> > >> f15 += other.f15;
> > >> f16 += other.f16;
> > >> f17 += other.f17;
> > >> f18 += other.f18;
> > >> f19 += other.f19;
> > >> f20 += other.f20;
> > >> f21 += other.f21;
> > >> f22 += other.f22;
> > >> f23 += other.f23;
> > >> f24 += other.f24;
> > >> return this;
> > >> }
> > >>
> > >> public Point25 div(long val) {
> > >> f0 /= val;
> > >> f1 /= val;
> > >> f2 /= val;
> > >> f3 /= val;
> > >> f4 /= val;
> > >> f5 += val;
> > >> f6 += val;
> > >> f7 += val;
> > >> f8 += val;
> > >> f9 += val;
> > >> f10 += val;
> > >> f11 += val;
> > >> f12 += val;
> > >> f13 += val;
> > >> f14 += val;
> > >> f15 += val;
> > >> f16 += val;
> > >> f17 += val;
> > >> f18 += val;
> > >> f19 += val;
> > >> f20 += val;
> > >> f21 += val;
> > >> f22 += val;
> > >> f23 += val;
> > >> f24 += val;
> > >> return this;
> > >> }
> > >>
> > >> public double euclideanDistance(Point25 other) {
> > >> List l = this.getFieldsAsList();
> > >> List ol = other.getFieldsAsList();
> > >> double res = 0;
> > >> for(int i=0;i
> > >>
> > >>> I came accross an error for which I am unable to retrace the exact
> > >>>
> > >> cause.
> > >>
> > >>> Starting from flink-java-examples module, I have extended the KMeans
> > >>> example
> > >>> to a case where points have 25 coordinates. It follows the exact
> > >>>
> > >> same
> > >>
> > >>> structure and transformations as the original example, only with
> > >>>
> > >> points
> > >>
> > >>> having 25 coordinates instead of 2.
> > >>>
> > >>> When creating the centroids dataset within the code as follows the
> > >>>
> > >> job
> > >>
> > >>> iterates and executes well:
> > >>>
> > >>> Centroid25 cent1 = new
> > >>>
> > >> Centroid25(ThreadLocalRandom.current().nextInt(0,
> > >>
> > >>> 1000),
> > >>>
> > >>>
> > >>>  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
> > >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
> > >>
> > >>> Centroid25 cent2 = new
> > >>>
> > >> Centroid25(ThreadLocalRandom.current().nextInt(0,
> > >>
> > >>> 1000),
> > >>>
> > >>>
> > >>>  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
> > >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
> > >>
> > >>> DataSet centroids = env.fromCollection(Arrays.asList(cent1,
> > >>> cent2));
> > >>>
> > >>> When reading from a csv file containing the following:
> > >>>
> > >>>
> > >>>  -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
> > >> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
> > >>
> > >>>
> > >>>  -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
> > >> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
> > >>
> > >>> with the following code:
> > >>> DataSet> centroids = env
> > >>>
> > >>> .readCsvFile("file:///home/nltran/res3.csv")
> > >>>
> > >>>
> > >> .fieldDelimiter(",")
> > >>
> > >>>
> > >>>
> > >> .includeFields("1111111111111111111111111")
> > >>
> > >>>
> > >>>
> > >> .types(Double.class, Double.class,
> > >>
> > >>> Double.class, Double.class,
> > >>> Double.class, Double.class,
> > >>>
> > >>>
> > >> Double.class,
> > >>
> > >>> Double.class, Double.class, Double.class, Double.class,
> > >>> Double.class,
> > >>>
> > >>>
> > >> Double.class,
> > >>
> > >>> Double.class, Double.class, Double.class, Double.class,
> > >>> Double.class,
> > >>>
> > >>>
> > >> Double.class,
> > >>
> > >>> Double.class, Double.class, Double.class, Double.class,
> > >>> Double.class,
> > >>>
> > >>>
> > >> Double.class).map(p -> {
> > >>
> > >>>
> > >>>
> > >> return new
> > >>
> > >>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
> > >>>
> > >>>
> > >>>  p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.
> > >> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.
> > >> f21,p.f22,p.f23,p.f24);
> > >>
> > >>>
> > >>>
> > >> }).returns("eu.euranova.flink.Centroid25");
> > >>
> > >>>
> > >>> I hit the following exception:
> > >>>
> > >>> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
> > >>> Iteration))(1/1)
> > >>> switched to FAILED
> > >>> com.esotericsoftware.kryo.KryoException: Buffer underflow
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
> > >> NoFetchingInput.java:76)
> > >>
> > >>>           at
> > >>>
> > >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> > >> DefaultClassResolver.java:109)
> > >>
> > >>>           at
> > >>>
> > >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> > >>
> > >>>           at
> > >>>
> > >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>
> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> > >> KryoSerializer.java:205)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>
> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> > >> KryoSerializer.java:210)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.io.disk.InputViewIterator.next(
> > >> InputViewIterator.java:43)
> > >>
> > >>>           at
> > >>>
> > >>>
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.operators.RegularPactTask.run(
> > >> RegularPactTask.java:496)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
> > >> AbstractIterativePactTask.java:138)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
> > >> IterationHeadPactTask.java:324)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.operators.RegularPactTask.
> > >> invoke(RegularPactTask.java:360)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.execution.RuntimeEnvironment.
> > >> run(RuntimeEnvironment.java:204)
> > >>
> > >>>           at java.lang.Thread.run(Thread.java:745)
> > >>>
> > >>> 02/11/2015 14:58:27     Job execution switched to status
> > >>>
> > >> FAILING.
> > >>
> > >>> 02/11/2015 14:58:27     CHAIN Map (Map at
> > >>>
> > >> main(DoTheKMeans.java:64)) ->
> > >>
> > >>> Map (Map
> > >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
> > >>> 02/11/2015 14:58:27     Combine (Reduce at
> > >>>
> > >> main(DoTheKMeans.java:68))(1/1)
> > >>
> > >>> switched to CANCELING
> > >>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
> > >>>
> > >> main(DoTheKMeans.java:68))
> > >>
> > >>> -> Map
> > >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
> > >>> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)
> > >>>
> > >> switched to
> > >>
> > >>> CANCELED
> > >>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
> > >>>
> > >> Iteration))(1/1) switched
> > >>
> > >>> to
> > >>> CANCELING
> > >>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
> > >>>
> > >> Iteration))(1/1) switched
> > >>
> > >>> to
> > >>> CANCELED
> > >>> 02/11/2015 14:58:27     CHAIN Map (Map at
> > >>>
> > >> main(DoTheKMeans.java:64)) ->
> > >>
> > >>> Map (Map
> > >>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
> > >>> 02/11/2015 14:58:27     Combine (Reduce at
> > >>>
> > >> main(DoTheKMeans.java:68))(1/1)
> > >>
> > >>> switched to CANCELED
> > >>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
> > >>>
> > >> main(DoTheKMeans.java:68))
> > >>
> > >>> -> Map
> > >>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
> > >>> 02/11/2015 14:58:27     Job execution switched to status FAILED.
> > >>> Exception in thread "main"
> > >>> org.apache.flink.runtime.client.JobExecutionException:
> > >>> com.esotericsoftware.kryo.KryoException: Buffer underflow
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
> > >> NoFetchingInput.java:76)
> > >>
> > >>>           at
> > >>>
> > >> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> > >> DefaultClassResolver.java:109)
> > >>
> > >>>           at
> > >>>
> > >> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> > >>
> > >>>           at
> > >>>
> > >> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>
> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> > >> KryoSerializer.java:205)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>
> > org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> > >> KryoSerializer.java:210)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.io.disk.InputViewIterator.next(
> > >> InputViewIterator.java:43)
> > >>
> > >>>           at
> > >>>
> > >>>
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.operators.RegularPactTask.run(
> > >> RegularPactTask.java:496)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
> > >> AbstractIterativePactTask.java:138)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
> > >> IterationHeadPactTask.java:324)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.operators.RegularPactTask.
> > >> invoke(RegularPactTask.java:360)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.execution.RuntimeEnvironment.
> > >> run(RuntimeEnvironment.java:204)
> > >>
> > >>>           at java.lang.Thread.run(Thread.java:745)
> > >>>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.client.JobClientListener$$anonfun$
> > >> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> > >> AbstractPartialFunction.scala:33)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> > >> AbstractPartialFunction.scala:33)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> > >> AbstractPartialFunction.scala:25)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
> > >> apply(ActorLogMessages.scala:37)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
> > >> apply(ActorLogMessages.scala:30)
> > >>
> > >>>           at
> > >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.ActorLogMessages$$anon$1.
> > >> applyOrElse(ActorLogMessages.scala:30)
> > >>
> > >>>           at
> > >>>
> > >> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  org.apache.flink.runtime.client.JobClientListener.
> > >> aroundReceive(JobClient.scala:74)
> > >>
> > >>>           at
> > >>>
> > >> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> > >>
> > >>>           at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> > >>>           at
> > >>>
> > >> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> > >>
> > >>>           at akka.dispatch.Mailbox.run(Mailbox.scala:221)
> > >>>           at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> > >>>           at
> > >>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> > >>>           at
> > >>>
> > >>>
> > >>>  scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> > >> runTask(ForkJoinPool.java:1339)
> > >>
> > >>>           at
> > >>>
> > >>>  scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> > >> ForkJoinPool.java:1979)
> > >>
> > >>>           at
> > >>>
> > >>>
> > >>>  scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> > >> ForkJoinWorkerThread.java:107)
> > >>
> > >>> The centroid25 data is exactly the same in both cases. Could you
> > >>>
> > >> help me
> > >>
> > >>> retrace what is wrong?
> > >>>
> > >>> Thanks and best regards,
> > >>>
> > >>> Tran Nam-Luc
> > >>>
> > >>>
> > >>>
> > >>> --
> > >>> View this message in context:
> > >>>
> > >>>  http://apache-flink-incubator-mailing-list-archive.1008284.
> > >> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
> > >>
> > >>> Sent from the Apache Flink (Incubator) Mailing List archive. mailing
> > >>>
> > >> list
> > >>
> > >>> archive at Nabble.com.
> > >>>
> > >>>
> > >>
> > >>
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: kryoException : Buffer underflow

Timo Walther-2
@Stephan: Yes you are correct. Both omitting the "returns(...)"
statement, or changing it to "returns(Centroid25.class)" would help.

The returns(TypeInformation) and returns(String) methods do absolutely
no type extraction, the user has to know what he is doing. If you read
the methods description:

Pojo types such as org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1>
Generic types such as java.lang.Class

With the returns(String) method you can create all types of type
information we currently support.

returns(Class) the description is as follows:

This method takes a class that will be analyzed by Flink's type
extraction capabilities.


On 11.02.2015 21:42, Stephan Ewen wrote:

> But in this case, there are no type parameters, correct? Centroid25 is not
> a generic class...
>
> On Wed, Feb 11, 2015 at 9:40 PM, Robert Metzger <[hidden email]> wrote:
>
>> I think the issue is that the returns("eu.euranova.flink.Centroid25")
>> variant only passes a string and the system does not know the
>> typeparameters.
>> So we have to put GenericTypeInfo there, because we basically see Object's.
>>
>> On Wed, Feb 11, 2015 at 9:37 PM, Stephan Ewen <[hidden email]> wrote:
>>
>>> @Timo If I understand it correctly, both omitting the "returns(...)"
>>> statement, or changing it to "returns(Centroid25.class)" would help?
>>>
>>> I think that the behavior between "returns(Centroid25.class)" and "
>>> returns("eu.euranova.flink.Centroid25")" should be consistent in that
>> they
>>> both handle the type as a POJO.
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther <[hidden email]>
>> wrote:
>>>> Hey Nam-Luc,
>>>>
>>>> I think your problem lies in the following line:
>>>>
>>>> .returns("eu.euranova.flink.Centroid25")
>>>>
>>>> If you do not specify the fields of the class in the String by using
>>>> "<myfield=String,otherField=int>", the underlying parser will create an
>>>> "GenericTypeInfo" type information which then uses Kryo for
>>> serialization.
>>>> In general, lambda expressions are a very new feature which currently
>>>> makes a lot of problems due to missing type information by compilers.
>>> Maybe
>>>> it is better to use (anonymous) classes instead.
>>>>
>>>> In case of "map()" functions you don't need to provide type hints
>> through
>>>> the "returns()" method.
>>>>
>>>> For other operators you need to either specify all fields of the class
>> in
>>>> the String (makes no sense in you case) or you change the method to
>>>>
>>>> .returns(Centroid25.class)
>>>>
>>>> I hope that helps.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 11.02.2015 17:38, Nam-Luc Tran wrote:
>>>>
>>>>> Hello Stephan,
>>>>>
>>>>> Thank you for your help.
>>>>>
>>>>> I ensured all the POJO classes used comply to what you previously said
>>>>> and the same exception occurs. Here is the listing of classes
>>>>> Centroid25 and Point25:
>>>>>
>>>>> public class Centroid25 extends Point25 {
>>>>>
>>>>> public int id;
>>>>>
>>>>> public Centroid25() {}
>>>>>
>>>>> public Centroid25(int id, Double value0, Double value1, Double value2,
>>>>> Double value3, Double value4, Double value5,
>>>>> Double value6, Double value7, Double value8, Double value9, Double
>>>>> value10, Double value11, Double value12,
>>>>> Double value13, Double value14, Double value15, Double value16, Double
>>>>> value17, Double value18,
>>>>> Double value19, Double value20, Double value21, Double value22, Double
>>>>> value23, Double value24) {
>>>>> super(value0, value1, value2, value3, value4, value5, value6, value7,
>>>>> value8, value9, value10, value11,
>>>>> value12, value13, value14, value15, value16, value17, value18,
>>>>> value19, value20, value21, value22,
>>>>> value23, value24);
>>>>> this.id = id;
>>>>> }
>>>>>
>>>>> public Centroid25(int id, Point25 p) {
>>>>> super(p.f0,
>>>>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
>>>>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
>>>>> f22,p.f23,p.f24);
>>>>> this.id = id;
>>>>> }
>>>>>
>>>>> public Centroid25(int id, Tuple25 p) {
>>>>> super(p.f0,
>>>>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
>>>>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
>>>>> f22,p.f23,p.f24);
>>>>> this.id = id;
>>>>> }
>>>>>
>>>>> @Override
>>>>> public String toString() {
>>>>> return id + " " + super.toString();
>>>>> }
>>>>> }
>>>>>
>>>>> public class Point25{
>>>>>
>>>>> public Double
>>>>> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,
>>>>> f17,f18,f19,f20,f21,f22,f23,f24
>>>>> = 0.0;
>>>>>
>>>>> public Point25() {
>>>>> }
>>>>>
>>>>> public Point25(Double value0, Double value1, Double value2, Double
>>>>> value3, Double value4, Double value5,
>>>>> Double value6, Double value7, Double value8, Double value9, Double
>>>>> value10, Double value11, Double value12,
>>>>> Double value13, Double value14, Double value15, Double value16, Double
>>>>> value17, Double value18,
>>>>> Double value19, Double value20, Double value21, Double value22, Double
>>>>> value23, Double value24) {
>>>>> f0=value0;
>>>>> f1=value1;
>>>>> f2=value2;
>>>>> f3=value3;
>>>>> f4=value4;
>>>>> f5=value5;
>>>>> f6=value6;
>>>>> f7=value7;
>>>>> f8=value8;
>>>>> f9=value9;
>>>>> f10=value10;
>>>>> f11=value11;
>>>>> f12=value12;
>>>>> f13=value13;
>>>>> f14=value14;
>>>>> f15=value15;
>>>>> f16=value16;
>>>>> f17=value17;
>>>>> f18=value18;
>>>>> f19=value19;
>>>>> f20=value20;
>>>>> f21=value21;
>>>>> f22=value22;
>>>>> f23=value23;
>>>>> f24=value24;
>>>>>
>>>>> }
>>>>>
>>>>> public List getFieldsAsList() {
>>>>> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11,
>>>>> f12, f13, f14, f15, f16, f17, f18, f19,
>>>>> f20, f21, f22, f23, f24);
>>>>> }
>>>>>
>>>>> public Point25 add(Point25 other) {
>>>>> f0 += other.f0;
>>>>> f1 += other.f1;
>>>>> f2 += other.f2;
>>>>> f3 += other.f3;
>>>>> f4 += other.f4;
>>>>> f5 += other.f5;
>>>>> f6 += other.f6;
>>>>> f7 += other.f7;
>>>>> f8 += other.f8;
>>>>> f9 += other.f9;
>>>>> f10 += other.f10;
>>>>> f11 += other.f11;
>>>>> f12 += other.f12;
>>>>> f13 += other.f13;
>>>>> f14 += other.f14;
>>>>> f15 += other.f15;
>>>>> f16 += other.f16;
>>>>> f17 += other.f17;
>>>>> f18 += other.f18;
>>>>> f19 += other.f19;
>>>>> f20 += other.f20;
>>>>> f21 += other.f21;
>>>>> f22 += other.f22;
>>>>> f23 += other.f23;
>>>>> f24 += other.f24;
>>>>> return this;
>>>>> }
>>>>>
>>>>> public Point25 div(long val) {
>>>>> f0 /= val;
>>>>> f1 /= val;
>>>>> f2 /= val;
>>>>> f3 /= val;
>>>>> f4 /= val;
>>>>> f5 += val;
>>>>> f6 += val;
>>>>> f7 += val;
>>>>> f8 += val;
>>>>> f9 += val;
>>>>> f10 += val;
>>>>> f11 += val;
>>>>> f12 += val;
>>>>> f13 += val;
>>>>> f14 += val;
>>>>> f15 += val;
>>>>> f16 += val;
>>>>> f17 += val;
>>>>> f18 += val;
>>>>> f19 += val;
>>>>> f20 += val;
>>>>> f21 += val;
>>>>> f22 += val;
>>>>> f23 += val;
>>>>> f24 += val;
>>>>> return this;
>>>>> }
>>>>>
>>>>> public double euclideanDistance(Point25 other) {
>>>>> List l = this.getFieldsAsList();
>>>>> List ol = other.getFieldsAsList();
>>>>> double res = 0;
>>>>> for(int i=0;i
>>>>>
>>>>>> I came accross an error for which I am unable to retrace the exact
>>>>>>
>>>>> cause.
>>>>>
>>>>>> Starting from flink-java-examples module, I have extended the KMeans
>>>>>> example
>>>>>> to a case where points have 25 coordinates. It follows the exact
>>>>>>
>>>>> same
>>>>>
>>>>>> structure and transformations as the original example, only with
>>>>>>
>>>>> points
>>>>>
>>>>>> having 25 coordinates instead of 2.
>>>>>>
>>>>>> When creating the centroids dataset within the code as follows the
>>>>>>
>>>>> job
>>>>>
>>>>>> iterates and executes well:
>>>>>>
>>>>>> Centroid25 cent1 = new
>>>>>>
>>>>> Centroid25(ThreadLocalRandom.current().nextInt(0,
>>>>>
>>>>>> 1000),
>>>>>>
>>>>>>
>>>>>>   -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
>>>>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
>>>>>
>>>>>> Centroid25 cent2 = new
>>>>>>
>>>>> Centroid25(ThreadLocalRandom.current().nextInt(0,
>>>>>
>>>>>> 1000),
>>>>>>
>>>>>>
>>>>>>   -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
>>>>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
>>>>>
>>>>>> DataSet centroids = env.fromCollection(Arrays.asList(cent1,
>>>>>> cent2));
>>>>>>
>>>>>> When reading from a csv file containing the following:
>>>>>>
>>>>>>
>>>>>>   -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
>>>>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
>>>>>
>>>>>>   -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
>>>>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
>>>>>
>>>>>> with the following code:
>>>>>> DataSet> centroids = env
>>>>>>
>>>>>> .readCsvFile("file:///home/nltran/res3.csv")
>>>>>>
>>>>>>
>>>>> .fieldDelimiter(",")
>>>>>
>>>>>>
>>>>> .includeFields("1111111111111111111111111")
>>>>>
>>>>>>
>>>>> .types(Double.class, Double.class,
>>>>>
>>>>>> Double.class, Double.class,
>>>>>> Double.class, Double.class,
>>>>>>
>>>>>>
>>>>> Double.class,
>>>>>
>>>>>> Double.class, Double.class, Double.class, Double.class,
>>>>>> Double.class,
>>>>>>
>>>>>>
>>>>> Double.class,
>>>>>
>>>>>> Double.class, Double.class, Double.class, Double.class,
>>>>>> Double.class,
>>>>>>
>>>>>>
>>>>> Double.class,
>>>>>
>>>>>> Double.class, Double.class, Double.class, Double.class,
>>>>>> Double.class,
>>>>>>
>>>>>>
>>>>> Double.class).map(p -> {
>>>>>
>>>>>>
>>>>> return new
>>>>>
>>>>>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
>>>>>>
>>>>>>
>>>>>>   p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.
>>>>> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.
>>>>> f21,p.f22,p.f23,p.f24);
>>>>>
>>>>>>
>>>>> }).returns("eu.euranova.flink.Centroid25");
>>>>>
>>>>>> I hit the following exception:
>>>>>>
>>>>>> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
>>>>>> Iteration))(1/1)
>>>>>> switched to FAILED
>>>>>> com.esotericsoftware.kryo.KryoException: Buffer underflow
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
>>>>> NoFetchingInput.java:76)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
>>>>> DefaultClassResolver.java:109)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>>>>> KryoSerializer.java:205)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>>>>> KryoSerializer.java:210)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.io.disk.InputViewIterator.next(
>>>>> InputViewIterator.java:43)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.operators.RegularPactTask.run(
>>>>> RegularPactTask.java:496)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
>>>>> AbstractIterativePactTask.java:138)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
>>>>> IterationHeadPactTask.java:324)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.operators.RegularPactTask.
>>>>> invoke(RegularPactTask.java:360)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.execution.RuntimeEnvironment.
>>>>> run(RuntimeEnvironment.java:204)
>>>>>
>>>>>>            at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> 02/11/2015 14:58:27     Job execution switched to status
>>>>>>
>>>>> FAILING.
>>>>>
>>>>>> 02/11/2015 14:58:27     CHAIN Map (Map at
>>>>>>
>>>>> main(DoTheKMeans.java:64)) ->
>>>>>
>>>>>> Map (Map
>>>>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
>>>>>> 02/11/2015 14:58:27     Combine (Reduce at
>>>>>>
>>>>> main(DoTheKMeans.java:68))(1/1)
>>>>>
>>>>>> switched to CANCELING
>>>>>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
>>>>>>
>>>>> main(DoTheKMeans.java:68))
>>>>>
>>>>>> -> Map
>>>>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
>>>>>> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)
>>>>>>
>>>>> switched to
>>>>>
>>>>>> CANCELED
>>>>>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
>>>>>>
>>>>> Iteration))(1/1) switched
>>>>>
>>>>>> to
>>>>>> CANCELING
>>>>>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
>>>>>>
>>>>> Iteration))(1/1) switched
>>>>>
>>>>>> to
>>>>>> CANCELED
>>>>>> 02/11/2015 14:58:27     CHAIN Map (Map at
>>>>>>
>>>>> main(DoTheKMeans.java:64)) ->
>>>>>
>>>>>> Map (Map
>>>>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
>>>>>> 02/11/2015 14:58:27     Combine (Reduce at
>>>>>>
>>>>> main(DoTheKMeans.java:68))(1/1)
>>>>>
>>>>>> switched to CANCELED
>>>>>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
>>>>>>
>>>>> main(DoTheKMeans.java:68))
>>>>>
>>>>>> -> Map
>>>>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
>>>>>> 02/11/2015 14:58:27     Job execution switched to status FAILED.
>>>>>> Exception in thread "main"
>>>>>> org.apache.flink.runtime.client.JobExecutionException:
>>>>>> com.esotericsoftware.kryo.KryoException: Buffer underflow
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
>>>>> NoFetchingInput.java:76)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
>>>>> DefaultClassResolver.java:109)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>>>>> KryoSerializer.java:205)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>>>>> KryoSerializer.java:210)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.io.disk.InputViewIterator.next(
>>>>> InputViewIterator.java:43)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.operators.RegularPactTask.run(
>>>>> RegularPactTask.java:496)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
>>>>> AbstractIterativePactTask.java:138)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
>>>>> IterationHeadPactTask.java:324)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.operators.RegularPactTask.
>>>>> invoke(RegularPactTask.java:360)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.execution.RuntimeEnvironment.
>>>>> run(RuntimeEnvironment.java:204)
>>>>>
>>>>>>            at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.client.JobClientListener$$anonfun$
>>>>> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
>>>>> AbstractPartialFunction.scala:33)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
>>>>> AbstractPartialFunction.scala:33)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
>>>>> AbstractPartialFunction.scala:25)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.ActorLogMessages$$anon$1.
>>>>> apply(ActorLogMessages.scala:37)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.ActorLogMessages$$anon$1.
>>>>> apply(ActorLogMessages.scala:30)
>>>>>
>>>>>>            at
>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.ActorLogMessages$$anon$1.
>>>>> applyOrElse(ActorLogMessages.scala:30)
>>>>>
>>>>>>            at
>>>>>>
>>>>> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.client.JobClientListener.
>>>>> aroundReceive(JobClient.scala:74)
>>>>>
>>>>>>            at
>>>>>>
>>>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>
>>>>>>            at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>>            at
>>>>>>
>>>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>>>
>>>>>>            at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>>>>            at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>>>>            at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>>> runTask(ForkJoinPool.java:1339)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>   scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>>>> ForkJoinPool.java:1979)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>>>> ForkJoinWorkerThread.java:107)
>>>>>
>>>>>> The centroid25 data is exactly the same in both cases. Could you
>>>>>>
>>>>> help me
>>>>>
>>>>>> retrace what is wrong?
>>>>>>
>>>>>> Thanks and best regards,
>>>>>>
>>>>>> Tran Nam-Luc
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>>
>>>>>>   http://apache-flink-incubator-mailing-list-archive.1008284.
>>>>> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
>>>>>
>>>>>> Sent from the Apache Flink (Incubator) Mailing List archive. mailing
>>>>>>
>>>>> list
>>>>>
>>>>>> archive at Nabble.com.
>>>>>>
>>>>>>
>>>>>

Reply | Threaded
Open this post in threaded view
|

AW: kryoException : Buffer underflow

Kirschnick, Johannes
Hi,

I basically just reported an issue and found this thread on the list about the same error

Just bringing this up here, in case these issues are linked ...

There is a small testcase to reproduce attached
https://issues.apache.org/jira/browse/FLINK-1531

I tried to single in on the code and find the problem - which might be related to the type eraser?

It seems that in the mentioned scenario there is a MutableObjectIterator which is iterated and null is used to signal "no more".
Because kryo is in the mix - it eagerly tries to read "next" which fails with buffer underflow.
So somewhere there should be a hasNext call ..

Johannes
________________________________________
Von: Timo Walther <[hidden email]>
Gesendet: Mittwoch, 11. Februar 2015 21:55
An: [hidden email]
Betreff: Re: kryoException : Buffer underflow

@Stephan: Yes you are correct. Both omitting the "returns(...)"
statement, or changing it to "returns(Centroid25.class)" would help.

The returns(TypeInformation) and returns(String) methods do absolutely
no type extraction, the user has to know what he is doing. If you read
the methods description:

Pojo types such as org.my.MyPojo<myFieldName=TYPE0,myFieldName2=TYPE1>
Generic types such as java.lang.Class

With the returns(String) method you can create all types of type
information we currently support.

returns(Class) the description is as follows:

This method takes a class that will be analyzed by Flink's type
extraction capabilities.


On 11.02.2015 21:42, Stephan Ewen wrote:

> But in this case, there are no type parameters, correct? Centroid25 is not
> a generic class...
>
> On Wed, Feb 11, 2015 at 9:40 PM, Robert Metzger <[hidden email]> wrote:
>
>> I think the issue is that the returns("eu.euranova.flink.Centroid25")
>> variant only passes a string and the system does not know the
>> typeparameters.
>> So we have to put GenericTypeInfo there, because we basically see Object's.
>>
>> On Wed, Feb 11, 2015 at 9:37 PM, Stephan Ewen <[hidden email]> wrote:
>>
>>> @Timo If I understand it correctly, both omitting the "returns(...)"
>>> statement, or changing it to "returns(Centroid25.class)" would help?
>>>
>>> I think that the behavior between "returns(Centroid25.class)" and "
>>> returns("eu.euranova.flink.Centroid25")" should be consistent in that
>> they
>>> both handle the type as a POJO.
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther <[hidden email]>
>> wrote:
>>>> Hey Nam-Luc,
>>>>
>>>> I think your problem lies in the following line:
>>>>
>>>> .returns("eu.euranova.flink.Centroid25")
>>>>
>>>> If you do not specify the fields of the class in the String by using
>>>> "<myfield=String,otherField=int>", the underlying parser will create an
>>>> "GenericTypeInfo" type information which then uses Kryo for
>>> serialization.
>>>> In general, lambda expressions are a very new feature which currently
>>>> makes a lot of problems due to missing type information by compilers.
>>> Maybe
>>>> it is better to use (anonymous) classes instead.
>>>>
>>>> In case of "map()" functions you don't need to provide type hints
>> through
>>>> the "returns()" method.
>>>>
>>>> For other operators you need to either specify all fields of the class
>> in
>>>> the String (makes no sense in you case) or you change the method to
>>>>
>>>> .returns(Centroid25.class)
>>>>
>>>> I hope that helps.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 11.02.2015 17:38, Nam-Luc Tran wrote:
>>>>
>>>>> Hello Stephan,
>>>>>
>>>>> Thank you for your help.
>>>>>
>>>>> I ensured all the POJO classes used comply to what you previously said
>>>>> and the same exception occurs. Here is the listing of classes
>>>>> Centroid25 and Point25:
>>>>>
>>>>> public class Centroid25 extends Point25 {
>>>>>
>>>>> public int id;
>>>>>
>>>>> public Centroid25() {}
>>>>>
>>>>> public Centroid25(int id, Double value0, Double value1, Double value2,
>>>>> Double value3, Double value4, Double value5,
>>>>> Double value6, Double value7, Double value8, Double value9, Double
>>>>> value10, Double value11, Double value12,
>>>>> Double value13, Double value14, Double value15, Double value16, Double
>>>>> value17, Double value18,
>>>>> Double value19, Double value20, Double value21, Double value22, Double
>>>>> value23, Double value24) {
>>>>> super(value0, value1, value2, value3, value4, value5, value6, value7,
>>>>> value8, value9, value10, value11,
>>>>> value12, value13, value14, value15, value16, value17, value18,
>>>>> value19, value20, value21, value22,
>>>>> value23, value24);
>>>>> this.id = id;
>>>>> }
>>>>>
>>>>> public Centroid25(int id, Point25 p) {
>>>>> super(p.f0,
>>>>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
>>>>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
>>>>> f22,p.f23,p.f24);
>>>>> this.id = id;
>>>>> }
>>>>>
>>>>> public Centroid25(int id, Tuple25 p) {
>>>>> super(p.f0,
>>>>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
>>>>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
>>>>> f22,p.f23,p.f24);
>>>>> this.id = id;
>>>>> }
>>>>>
>>>>> @Override
>>>>> public String toString() {
>>>>> return id + " " + super.toString();
>>>>> }
>>>>> }
>>>>>
>>>>> public class Point25{
>>>>>
>>>>> public Double
>>>>> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,
>>>>> f17,f18,f19,f20,f21,f22,f23,f24
>>>>> = 0.0;
>>>>>
>>>>> public Point25() {
>>>>> }
>>>>>
>>>>> public Point25(Double value0, Double value1, Double value2, Double
>>>>> value3, Double value4, Double value5,
>>>>> Double value6, Double value7, Double value8, Double value9, Double
>>>>> value10, Double value11, Double value12,
>>>>> Double value13, Double value14, Double value15, Double value16, Double
>>>>> value17, Double value18,
>>>>> Double value19, Double value20, Double value21, Double value22, Double
>>>>> value23, Double value24) {
>>>>> f0=value0;
>>>>> f1=value1;
>>>>> f2=value2;
>>>>> f3=value3;
>>>>> f4=value4;
>>>>> f5=value5;
>>>>> f6=value6;
>>>>> f7=value7;
>>>>> f8=value8;
>>>>> f9=value9;
>>>>> f10=value10;
>>>>> f11=value11;
>>>>> f12=value12;
>>>>> f13=value13;
>>>>> f14=value14;
>>>>> f15=value15;
>>>>> f16=value16;
>>>>> f17=value17;
>>>>> f18=value18;
>>>>> f19=value19;
>>>>> f20=value20;
>>>>> f21=value21;
>>>>> f22=value22;
>>>>> f23=value23;
>>>>> f24=value24;
>>>>>
>>>>> }
>>>>>
>>>>> public List getFieldsAsList() {
>>>>> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9, f10, f11,
>>>>> f12, f13, f14, f15, f16, f17, f18, f19,
>>>>> f20, f21, f22, f23, f24);
>>>>> }
>>>>>
>>>>> public Point25 add(Point25 other) {
>>>>> f0 += other.f0;
>>>>> f1 += other.f1;
>>>>> f2 += other.f2;
>>>>> f3 += other.f3;
>>>>> f4 += other.f4;
>>>>> f5 += other.f5;
>>>>> f6 += other.f6;
>>>>> f7 += other.f7;
>>>>> f8 += other.f8;
>>>>> f9 += other.f9;
>>>>> f10 += other.f10;
>>>>> f11 += other.f11;
>>>>> f12 += other.f12;
>>>>> f13 += other.f13;
>>>>> f14 += other.f14;
>>>>> f15 += other.f15;
>>>>> f16 += other.f16;
>>>>> f17 += other.f17;
>>>>> f18 += other.f18;
>>>>> f19 += other.f19;
>>>>> f20 += other.f20;
>>>>> f21 += other.f21;
>>>>> f22 += other.f22;
>>>>> f23 += other.f23;
>>>>> f24 += other.f24;
>>>>> return this;
>>>>> }
>>>>>
>>>>> public Point25 div(long val) {
>>>>> f0 /= val;
>>>>> f1 /= val;
>>>>> f2 /= val;
>>>>> f3 /= val;
>>>>> f4 /= val;
>>>>> f5 += val;
>>>>> f6 += val;
>>>>> f7 += val;
>>>>> f8 += val;
>>>>> f9 += val;
>>>>> f10 += val;
>>>>> f11 += val;
>>>>> f12 += val;
>>>>> f13 += val;
>>>>> f14 += val;
>>>>> f15 += val;
>>>>> f16 += val;
>>>>> f17 += val;
>>>>> f18 += val;
>>>>> f19 += val;
>>>>> f20 += val;
>>>>> f21 += val;
>>>>> f22 += val;
>>>>> f23 += val;
>>>>> f24 += val;
>>>>> return this;
>>>>> }
>>>>>
>>>>> public double euclideanDistance(Point25 other) {
>>>>> List l = this.getFieldsAsList();
>>>>> List ol = other.getFieldsAsList();
>>>>> double res = 0;
>>>>> for(int i=0;i
>>>>>
>>>>>> I came accross an error for which I am unable to retrace the exact
>>>>>>
>>>>> cause.
>>>>>
>>>>>> Starting from flink-java-examples module, I have extended the KMeans
>>>>>> example
>>>>>> to a case where points have 25 coordinates. It follows the exact
>>>>>>
>>>>> same
>>>>>
>>>>>> structure and transformations as the original example, only with
>>>>>>
>>>>> points
>>>>>
>>>>>> having 25 coordinates instead of 2.
>>>>>>
>>>>>> When creating the centroids dataset within the code as follows the
>>>>>>
>>>>> job
>>>>>
>>>>>> iterates and executes well:
>>>>>>
>>>>>> Centroid25 cent1 = new
>>>>>>
>>>>> Centroid25(ThreadLocalRandom.current().nextInt(0,
>>>>>
>>>>>> 1000),
>>>>>>
>>>>>>
>>>>>>   -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
>>>>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
>>>>>
>>>>>> Centroid25 cent2 = new
>>>>>>
>>>>> Centroid25(ThreadLocalRandom.current().nextInt(0,
>>>>>
>>>>>> 1000),
>>>>>>
>>>>>>
>>>>>>   -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
>>>>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
>>>>>
>>>>>> DataSet centroids = env.fromCollection(Arrays.asList(cent1,
>>>>>> cent2));
>>>>>>
>>>>>> When reading from a csv file containing the following:
>>>>>>
>>>>>>
>>>>>>   -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
>>>>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
>>>>>
>>>>>>   -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
>>>>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
>>>>>
>>>>>> with the following code:
>>>>>> DataSet> centroids = env
>>>>>>
>>>>>> .readCsvFile("file:///home/nltran/res3.csv")
>>>>>>
>>>>>>
>>>>> .fieldDelimiter(",")
>>>>>
>>>>>>
>>>>> .includeFields("1111111111111111111111111")
>>>>>
>>>>>>
>>>>> .types(Double.class, Double.class,
>>>>>
>>>>>> Double.class, Double.class,
>>>>>> Double.class, Double.class,
>>>>>>
>>>>>>
>>>>> Double.class,
>>>>>
>>>>>> Double.class, Double.class, Double.class, Double.class,
>>>>>> Double.class,
>>>>>>
>>>>>>
>>>>> Double.class,
>>>>>
>>>>>> Double.class, Double.class, Double.class, Double.class,
>>>>>> Double.class,
>>>>>>
>>>>>>
>>>>> Double.class,
>>>>>
>>>>>> Double.class, Double.class, Double.class, Double.class,
>>>>>> Double.class,
>>>>>>
>>>>>>
>>>>> Double.class).map(p -> {
>>>>>
>>>>>>
>>>>> return new
>>>>>
>>>>>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
>>>>>>
>>>>>>
>>>>>>   p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.
>>>>> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.
>>>>> f21,p.f22,p.f23,p.f24);
>>>>>
>>>>>>
>>>>> }).returns("eu.euranova.flink.Centroid25");
>>>>>
>>>>>> I hit the following exception:
>>>>>>
>>>>>> 02/11/2015 14:58:27     PartialSolution (BulkIteration (Bulk
>>>>>> Iteration))(1/1)
>>>>>> switched to FAILED
>>>>>> com.esotericsoftware.kryo.KryoException: Buffer underflow
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
>>>>> NoFetchingInput.java:76)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
>>>>> DefaultClassResolver.java:109)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>>>>> KryoSerializer.java:205)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>>>>> KryoSerializer.java:210)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.io.disk.InputViewIterator.next(
>>>>> InputViewIterator.java:43)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.operators.RegularPactTask.run(
>>>>> RegularPactTask.java:496)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
>>>>> AbstractIterativePactTask.java:138)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
>>>>> IterationHeadPactTask.java:324)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.operators.RegularPactTask.
>>>>> invoke(RegularPactTask.java:360)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.execution.RuntimeEnvironment.
>>>>> run(RuntimeEnvironment.java:204)
>>>>>
>>>>>>            at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> 02/11/2015 14:58:27     Job execution switched to status
>>>>>>
>>>>> FAILING.
>>>>>
>>>>>> 02/11/2015 14:58:27     CHAIN Map (Map at
>>>>>>
>>>>> main(DoTheKMeans.java:64)) ->
>>>>>
>>>>>> Map (Map
>>>>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
>>>>>> 02/11/2015 14:58:27     Combine (Reduce at
>>>>>>
>>>>> main(DoTheKMeans.java:68))(1/1)
>>>>>
>>>>>> switched to CANCELING
>>>>>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
>>>>>>
>>>>> main(DoTheKMeans.java:68))
>>>>>
>>>>>> -> Map
>>>>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
>>>>>> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)
>>>>>>
>>>>> switched to
>>>>>
>>>>>> CANCELED
>>>>>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
>>>>>>
>>>>> Iteration))(1/1) switched
>>>>>
>>>>>> to
>>>>>> CANCELING
>>>>>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
>>>>>>
>>>>> Iteration))(1/1) switched
>>>>>
>>>>>> to
>>>>>> CANCELED
>>>>>> 02/11/2015 14:58:27     CHAIN Map (Map at
>>>>>>
>>>>> main(DoTheKMeans.java:64)) ->
>>>>>
>>>>>> Map (Map
>>>>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
>>>>>> 02/11/2015 14:58:27     Combine (Reduce at
>>>>>>
>>>>> main(DoTheKMeans.java:68))(1/1)
>>>>>
>>>>>> switched to CANCELED
>>>>>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
>>>>>>
>>>>> main(DoTheKMeans.java:68))
>>>>>
>>>>>> -> Map
>>>>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
>>>>>> 02/11/2015 14:58:27     Job execution switched to status FAILED.
>>>>>> Exception in thread "main"
>>>>>> org.apache.flink.runtime.client.JobExecutionException:
>>>>>> com.esotericsoftware.kryo.KryoException: Buffer underflow
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
>>>>> NoFetchingInput.java:76)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
>>>>> DefaultClassResolver.java:109)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>>>>> KryoSerializer.java:205)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>>>>> KryoSerializer.java:210)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.io.disk.InputViewIterator.next(
>>>>> InputViewIterator.java:43)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.operators.RegularPactTask.run(
>>>>> RegularPactTask.java:496)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
>>>>> AbstractIterativePactTask.java:138)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
>>>>> IterationHeadPactTask.java:324)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.operators.RegularPactTask.
>>>>> invoke(RegularPactTask.java:360)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.execution.RuntimeEnvironment.
>>>>> run(RuntimeEnvironment.java:204)
>>>>>
>>>>>>            at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.client.JobClientListener$$anonfun$
>>>>> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
>>>>> AbstractPartialFunction.scala:33)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
>>>>> AbstractPartialFunction.scala:33)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
>>>>> AbstractPartialFunction.scala:25)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.ActorLogMessages$$anon$1.
>>>>> apply(ActorLogMessages.scala:37)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.ActorLogMessages$$anon$1.
>>>>> apply(ActorLogMessages.scala:30)
>>>>>
>>>>>>            at
>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.ActorLogMessages$$anon$1.
>>>>> applyOrElse(ActorLogMessages.scala:30)
>>>>>
>>>>>>            at
>>>>>>
>>>>> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.client.JobClientListener.
>>>>> aroundReceive(JobClient.scala:74)
>>>>>
>>>>>>            at
>>>>>>
>>>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>
>>>>>>            at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>>            at
>>>>>>
>>>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>>>
>>>>>>            at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>>>>            at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>>>>            at
>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>>> runTask(ForkJoinPool.java:1339)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>   scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>>>> ForkJoinPool.java:1979)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>>>> ForkJoinWorkerThread.java:107)
>>>>>
>>>>>> The centroid25 data is exactly the same in both cases. Could you
>>>>>>
>>>>> help me
>>>>>
>>>>>> retrace what is wrong?
>>>>>>
>>>>>> Thanks and best regards,
>>>>>>
>>>>>> Tran Nam-Luc
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>>
>>>>>>   http://apache-flink-incubator-mailing-list-archive.1008284.
>>>>> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
>>>>>
>>>>>> Sent from the Apache Flink (Incubator) Mailing List archive. mailing
>>>>>>
>>>>> list
>>>>>
>>>>>> archive at Nabble.com.
>>>>>>
>>>>>>
>>>>>
Reply | Threaded
Open this post in threaded view
|

Re: AW: kryoException : Buffer underflow

Nam-Luc Tran
In reply to this post by Nam-Luc Tran
Without the .returns(...) statement it yelled about type erasure.
Putting.returns(Centroid25.class) did the trick.

Thanks everyone for your help.

Tran Nam-Luc

At Thursday, 12/02/2015 on 12:06 Kirschnick, Johannes wrote:

Hi,

I basically just reported an issue and found this thread on the list
about the same error

Just bringing this up here, in case these issues are linked ...

There is a small testcase to reproduce attached
https://issues.apache.org/jira/browse/FLINK-1531

I tried to single in on the code and find the problem - which might be
related to the type eraser?

It seems that in the mentioned scenario there is a
MutableObjectIterator which is iterated and null is used to signal "no
more".
Because kryo is in the mix - it eagerly tries to read "next" which
fails with buffer underflow.
So somewhere there should be a hasNext call ..

Johannes
________________________________________
Von: Timo Walther
Gesendet: Mittwoch, 11. Februar 2015 21:55
An: [hidden email]
Betreff: Re: kryoException : Buffer underflow

@Stephan: Yes you are correct. Both omitting the "returns(...)"
statement, or changing it to "returns(Centroid25.class)" would help.

The returns(TypeInformation) and returns(String) methods do absolutely
no type extraction, the user has to know what he is doing. If you read
the methods description:

Pojo types such as org.my.MyPojo
Generic types such as java.lang.Class

With the returns(String) method you can create all types of type
information we currently support.

returns(Class) the description is as follows:

This method takes a class that will be analyzed by Flink's type
extraction capabilities.

On 11.02.2015 21:42, Stephan Ewen wrote:
> But in this case, there are no type parameters, correct? Centroid25
is not
> a generic class...
>
> On Wed, Feb 11, 2015 at 9:40 PM, Robert Metzger  wrote:
>
>> I think the issue is that the
returns("eu.euranova.flink.Centroid25")
>> variant only passes a string and the system does not know the
>> typeparameters.
>> So we have to put GenericTypeInfo there, because we basically see
Object's.
>>
>> On Wed, Feb 11, 2015 at 9:37 PM, Stephan Ewen  wrote:
>>
>>> @Timo If I understand it correctly, both omitting the
"returns(...)"
>>> statement, or changing it to "returns(Centroid25.class)" would
help?
>>>
>>> I think that the behavior between "returns(Centroid25.class)" and
"
>>> returns("eu.euranova.flink.Centroid25")" should be consistent in
that

>> they
>>> both handle the type as a POJO.
>>>
>>> Stephan
>>>
>>>
>>> On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther
>> wrote:
>>>> Hey Nam-Luc,
>>>>
>>>> I think your problem lies in the following line:
>>>>
>>>> .returns("eu.euranova.flink.Centroid25")
>>>>
>>>> If you do not specify the fields of the class in the String by
using
>>>> "", the underlying parser will create an
>>>> "GenericTypeInfo" type information which then uses Kryo for
>>> serialization.
>>>> In general, lambda expressions are a very new feature which
currently
>>>> makes a lot of problems due to missing type information by
compilers.
>>> Maybe
>>>> it is better to use (anonymous) classes instead.
>>>>
>>>> In case of "map()" functions you don't need to provide type hints
>> through
>>>> the "returns()" method.
>>>>
>>>> For other operators you need to either specify all fields of the
class
>> in
>>>> the String (makes no sense in you case) or you change the method
to

>>>>
>>>> .returns(Centroid25.class)
>>>>
>>>> I hope that helps.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 11.02.2015 17:38, Nam-Luc Tran wrote:
>>>>
>>>>> Hello Stephan,
>>>>>
>>>>> Thank you for your help.
>>>>>
>>>>> I ensured all the POJO classes used comply to what you
previously said

>>>>> and the same exception occurs. Here is the listing of classes
>>>>> Centroid25 and Point25:
>>>>>
>>>>> public class Centroid25 extends Point25 {
>>>>>
>>>>> public int id;
>>>>>
>>>>> public Centroid25() {}
>>>>>
>>>>> public Centroid25(int id, Double value0, Double value1, Double
value2,
>>>>> Double value3, Double value4, Double value5,
>>>>> Double value6, Double value7, Double value8, Double value9,
Double
>>>>> value10, Double value11, Double value12,
>>>>> Double value13, Double value14, Double value15, Double value16,
Double
>>>>> value17, Double value18,
>>>>> Double value19, Double value20, Double value21, Double value22,
Double
>>>>> value23, Double value24) {
>>>>> super(value0, value1, value2, value3, value4, value5, value6,
value7,

>>>>> value8, value9, value10, value11,
>>>>> value12, value13, value14, value15, value16, value17, value18,
>>>>> value19, value20, value21, value22,
>>>>> value23, value24);
>>>>> this.id = id;
>>>>> }
>>>>>
>>>>> public Centroid25(int id, Point25 p) {
>>>>> super(p.f0,
>>>>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
>>>>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
>>>>> f22,p.f23,p.f24);
>>>>> this.id = id;
>>>>> }
>>>>>
>>>>> public Centroid25(int id, Tuple25 p) {
>>>>> super(p.f0,
>>>>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
>>>>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
>>>>> f22,p.f23,p.f24);
>>>>> this.id = id;
>>>>> }
>>>>>
>>>>> @Override
>>>>> public String toString() {
>>>>> return id + " " + super.toString();
>>>>> }
>>>>> }
>>>>>
>>>>> public class Point25{
>>>>>
>>>>> public Double
>>>>> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,
>>>>> f17,f18,f19,f20,f21,f22,f23,f24
>>>>> = 0.0;
>>>>>
>>>>> public Point25() {
>>>>> }
>>>>>
>>>>> public Point25(Double value0, Double value1, Double value2,
Double
>>>>> value3, Double value4, Double value5,
>>>>> Double value6, Double value7, Double value8, Double value9,
Double
>>>>> value10, Double value11, Double value12,
>>>>> Double value13, Double value14, Double value15, Double value16,
Double
>>>>> value17, Double value18,
>>>>> Double value19, Double value20, Double value21, Double value22,
Double

>>>>> value23, Double value24) {
>>>>> f0=value0;
>>>>> f1=value1;
>>>>> f2=value2;
>>>>> f3=value3;
>>>>> f4=value4;
>>>>> f5=value5;
>>>>> f6=value6;
>>>>> f7=value7;
>>>>> f8=value8;
>>>>> f9=value9;
>>>>> f10=value10;
>>>>> f11=value11;
>>>>> f12=value12;
>>>>> f13=value13;
>>>>> f14=value14;
>>>>> f15=value15;
>>>>> f16=value16;
>>>>> f17=value17;
>>>>> f18=value18;
>>>>> f19=value19;
>>>>> f20=value20;
>>>>> f21=value21;
>>>>> f22=value22;
>>>>> f23=value23;
>>>>> f24=value24;
>>>>>
>>>>> }
>>>>>
>>>>> public List getFieldsAsList() {
>>>>> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9,
f10, f11,

>>>>> f12, f13, f14, f15, f16, f17, f18, f19,
>>>>> f20, f21, f22, f23, f24);
>>>>> }
>>>>>
>>>>> public Point25 add(Point25 other) {
>>>>> f0 += other.f0;
>>>>> f1 += other.f1;
>>>>> f2 += other.f2;
>>>>> f3 += other.f3;
>>>>> f4 += other.f4;
>>>>> f5 += other.f5;
>>>>> f6 += other.f6;
>>>>> f7 += other.f7;
>>>>> f8 += other.f8;
>>>>> f9 += other.f9;
>>>>> f10 += other.f10;
>>>>> f11 += other.f11;
>>>>> f12 += other.f12;
>>>>> f13 += other.f13;
>>>>> f14 += other.f14;
>>>>> f15 += other.f15;
>>>>> f16 += other.f16;
>>>>> f17 += other.f17;
>>>>> f18 += other.f18;
>>>>> f19 += other.f19;
>>>>> f20 += other.f20;
>>>>> f21 += other.f21;
>>>>> f22 += other.f22;
>>>>> f23 += other.f23;
>>>>> f24 += other.f24;
>>>>> return this;
>>>>> }
>>>>>
>>>>> public Point25 div(long val) {
>>>>> f0 /= val;
>>>>> f1 /= val;
>>>>> f2 /= val;
>>>>> f3 /= val;
>>>>> f4 /= val;
>>>>> f5 += val;
>>>>> f6 += val;
>>>>> f7 += val;
>>>>> f8 += val;
>>>>> f9 += val;
>>>>> f10 += val;
>>>>> f11 += val;
>>>>> f12 += val;
>>>>> f13 += val;
>>>>> f14 += val;
>>>>> f15 += val;
>>>>> f16 += val;
>>>>> f17 += val;
>>>>> f18 += val;
>>>>> f19 += val;
>>>>> f20 += val;
>>>>> f21 += val;
>>>>> f22 += val;
>>>>> f23 += val;
>>>>> f24 += val;
>>>>> return this;
>>>>> }
>>>>>
>>>>> public double euclideanDistance(Point25 other) {
>>>>> List l = this.getFieldsAsList();
>>>>> List ol = other.getFieldsAsList();
>>>>> double res = 0;
>>>>> for(int i=0;i
>>>>>
>>>>>> I came accross an error for which I am unable to retrace the
exact
>>>>>>
>>>>> cause.
>>>>>
>>>>>> Starting from flink-java-examples module, I have extended the
KMeans
>>>>>> example
>>>>>> to a case where points have 25 coordinates. It follows the
exact
>>>>>>
>>>>> same
>>>>>
>>>>>> structure and transformations as the original example, only
with
>>>>>>
>>>>> points
>>>>>
>>>>>> having 25 coordinates instead of 2.
>>>>>>
>>>>>> When creating the centroids dataset within the code as follows
the

>>>>>>
>>>>> job
>>>>>
>>>>>> iterates and executes well:
>>>>>>
>>>>>> Centroid25 cent1 = new
>>>>>>
>>>>> Centroid25(ThreadLocalRandom.current().nextInt(0,
>>>>>
>>>>>> 1000),
>>>>>>
>>>>>>
>>>>>>   -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
>>>>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
>>>>>
>>>>>> Centroid25 cent2 = new
>>>>>>
>>>>> Centroid25(ThreadLocalRandom.current().nextInt(0,
>>>>>
>>>>>> 1000),
>>>>>>
>>>>>>
>>>>>>   -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
>>>>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
>>>>>
>>>>>> DataSet centroids = env.fromCollection(Arrays.asList(cent1,
>>>>>> cent2));
>>>>>>
>>>>>> When reading from a csv file containing the following:
>>>>>>
>>>>>>
>>>>>>   -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
>>>>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
>>>>>
>>>>>>   -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
>>>>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
>>>>>
>>>>>> with the following code:
>>>>>> DataSet> centroids = env
>>>>>>
>>>>>> .readCsvFile("file:///home/nltran/res3.csv")
>>>>>>
>>>>>>
>>>>> .fieldDelimiter(",")
>>>>>
>>>>>>
>>>>> .includeFields("1111111111111111111111111")
>>>>>
>>>>>>
>>>>> .types(Double.class, Double.class,
>>>>>
>>>>>> Double.class, Double.class,
>>>>>> Double.class, Double.class,
>>>>>>
>>>>>>
>>>>> Double.class,
>>>>>
>>>>>> Double.class, Double.class, Double.class, Double.class,
>>>>>> Double.class,
>>>>>>
>>>>>>
>>>>> Double.class,
>>>>>
>>>>>> Double.class, Double.class, Double.class, Double.class,
>>>>>> Double.class,
>>>>>>
>>>>>>
>>>>> Double.class,
>>>>>
>>>>>> Double.class, Double.class, Double.class, Double.class,
>>>>>> Double.class,
>>>>>>
>>>>>>
>>>>> Double.class).map(p -> {
>>>>>
>>>>>>
>>>>> return new
>>>>>
>>>>>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
>>>>>>
>>>>>>
>>>>>>   p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.
>>>>> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.
>>>>> f21,p.f22,p.f23,p.f24);
>>>>>
>>>>>>
>>>>> }).returns("eu.euranova.flink.Centroid25");
>>>>>
>>>>>> I hit the following exception:
>>>>>>
>>>>>> 02/11/2015 14:58:27     PartialSolution (BulkIteration
(Bulk
>>>>>> Iteration))(1/1)
>>>>>> switched to FAILED
>>>>>> com.esotericsoftware.kryo.KryoException: Buffer underflow
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>  
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(

>>>>> NoFetchingInput.java:76)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>  
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(

>>>>> DefaultClassResolver.java:109)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>>
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>>>>> KryoSerializer.java:205)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>>
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(

>>>>> KryoSerializer.java:210)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.io.disk.InputViewIterator.next(
>>>>> InputViewIterator.java:43)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)

>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.operators.RegularPactTask.run(
>>>>> RegularPactTask.java:496)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
>>>>> AbstractIterativePactTask.java:138)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>  
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(

>>>>> IterationHeadPactTask.java:324)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.operators.RegularPactTask.
>>>>> invoke(RegularPactTask.java:360)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.execution.RuntimeEnvironment.
>>>>> run(RuntimeEnvironment.java:204)
>>>>>
>>>>>>            at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>> 02/11/2015 14:58:27     Job execution switched to status
>>>>>>
>>>>> FAILING.
>>>>>
>>>>>> 02/11/2015 14:58:27     CHAIN Map (Map at
>>>>>>
>>>>> main(DoTheKMeans.java:64)) ->
>>>>>
>>>>>> Map (Map
>>>>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
>>>>>> 02/11/2015 14:58:27     Combine (Reduce at
>>>>>>
>>>>> main(DoTheKMeans.java:68))(1/1)
>>>>>
>>>>>> switched to CANCELING
>>>>>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
>>>>>>
>>>>> main(DoTheKMeans.java:68))
>>>>>
>>>>>> -> Map
>>>>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
>>>>>> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)
>>>>>>
>>>>> switched to
>>>>>
>>>>>> CANCELED
>>>>>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
>>>>>>
>>>>> Iteration))(1/1) switched
>>>>>
>>>>>> to
>>>>>> CANCELING
>>>>>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
>>>>>>
>>>>> Iteration))(1/1) switched
>>>>>
>>>>>> to
>>>>>> CANCELED
>>>>>> 02/11/2015 14:58:27     CHAIN Map (Map at
>>>>>>
>>>>> main(DoTheKMeans.java:64)) ->
>>>>>
>>>>>> Map (Map
>>>>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
>>>>>> 02/11/2015 14:58:27     Combine (Reduce at
>>>>>>
>>>>> main(DoTheKMeans.java:68))(1/1)
>>>>>
>>>>>> switched to CANCELED
>>>>>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
>>>>>>
>>>>> main(DoTheKMeans.java:68))
>>>>>
>>>>>> -> Map
>>>>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
>>>>>> 02/11/2015 14:58:27     Job execution switched to status
FAILED.
>>>>>> Exception in thread "main"
>>>>>> org.apache.flink.runtime.client.JobExecutionException:
>>>>>> com.esotericsoftware.kryo.KryoException: Buffer underflow
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>  
org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(

>>>>> NoFetchingInput.java:76)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>  
com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(

>>>>> DefaultClassResolver.java:109)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
>>>>>
>>>>>>            at
>>>>>>
>>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>>
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
>>>>> KryoSerializer.java:205)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>>
org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(

>>>>> KryoSerializer.java:210)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.io.disk.InputViewIterator.next(
>>>>> InputViewIterator.java:43)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>
org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)

>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.operators.RegularPactTask.run(
>>>>> RegularPactTask.java:496)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>
>>
org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
>>>>> AbstractIterativePactTask.java:138)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>  
org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(

>>>>> IterationHeadPactTask.java:324)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.operators.RegularPactTask.
>>>>> invoke(RegularPactTask.java:360)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.execution.RuntimeEnvironment.
>>>>> run(RuntimeEnvironment.java:204)
>>>>>
>>>>>>            at java.lang.Thread.run(Thread.java:745)
>>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.client.JobClientListener$$anonfun$
>>>>> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>  
scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(

>>>>> AbstractPartialFunction.scala:33)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
>>>>> AbstractPartialFunction.scala:33)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
>>>>> AbstractPartialFunction.scala:25)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.ActorLogMessages$$anon$1.
>>>>> apply(ActorLogMessages.scala:37)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.ActorLogMessages$$anon$1.
>>>>> apply(ActorLogMessages.scala:30)
>>>>>
>>>>>>            at
>>>>>>
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)

>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.ActorLogMessages$$anon$1.
>>>>> applyOrElse(ActorLogMessages.scala:30)
>>>>>
>>>>>>            at
>>>>>>
>>>>> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   org.apache.flink.runtime.client.JobClientListener.
>>>>> aroundReceive(JobClient.scala:74)
>>>>>
>>>>>>            at
>>>>>>
>>>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>>>>>
>>>>>>            at
akka.actor.ActorCell.invoke(ActorCell.scala:487)
>>>>>>            at
>>>>>>
>>>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>>>>>
>>>>>>            at
akka.dispatch.Mailbox.run(Mailbox.scala:221)
>>>>>>            at
akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>>>>>>            at
>>>>>>
scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
>>>>> runTask(ForkJoinPool.java:1339)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>   scala.concurrent.forkjoin.ForkJoinPool.runWorker(
>>>>> ForkJoinPool.java:1979)
>>>>>
>>>>>>            at
>>>>>>
>>>>>>
>>>>>>   scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
>>>>> ForkJoinWorkerThread.java:107)
>>>>>
>>>>>> The centroid25 data is exactly the same in both cases. Could
you

>>>>>>
>>>>> help me
>>>>>
>>>>>> retrace what is wrong?
>>>>>>
>>>>>> Thanks and best regards,
>>>>>>
>>>>>> Tran Nam-Luc
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>>
>>>>>>   http://apache-flink-incubator-mailing-list-archive.1008284.
>>>>> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
>>>>>
>>>>>> Sent from the Apache Flink (Incubator) Mailing List archive.
mailing
>>>>>>
>>>>> list
>>>>>
>>>>>> archive at Nabble.com.
>>>>>>
>>>>>>
>>>>>


Reply | Threaded
Open this post in threaded view
|

Re: AW: kryoException : Buffer underflow

Till Rohrmann
The kryo underflow should be fixed with the PR [1].

[1] https://github.com/apache/flink/pull/391

On Thu, Feb 12, 2015 at 4:10 PM, Nam-Luc Tran <[hidden email]>
wrote:

> Without the .returns(...) statement it yelled about type erasure.
> Putting.returns(Centroid25.class) did the trick.
>
> Thanks everyone for your help.
>
> Tran Nam-Luc
>
> At Thursday, 12/02/2015 on 12:06 Kirschnick, Johannes wrote:
>
> Hi,
>
> I basically just reported an issue and found this thread on the list
> about the same error
>
> Just bringing this up here, in case these issues are linked ...
>
> There is a small testcase to reproduce attached
> https://issues.apache.org/jira/browse/FLINK-1531
>
> I tried to single in on the code and find the problem - which might be
> related to the type eraser?
>
> It seems that in the mentioned scenario there is a
> MutableObjectIterator which is iterated and null is used to signal "no
> more".
> Because kryo is in the mix - it eagerly tries to read "next" which
> fails with buffer underflow.
> So somewhere there should be a hasNext call ..
>
> Johannes
> ________________________________________
> Von: Timo Walther
> Gesendet: Mittwoch, 11. Februar 2015 21:55
> An: [hidden email]
> Betreff: Re: kryoException : Buffer underflow
>
> @Stephan: Yes you are correct. Both omitting the "returns(...)"
> statement, or changing it to "returns(Centroid25.class)" would help.
>
> The returns(TypeInformation) and returns(String) methods do absolutely
> no type extraction, the user has to know what he is doing. If you read
> the methods description:
>
> Pojo types such as org.my.MyPojo
> Generic types such as java.lang.Class
>
> With the returns(String) method you can create all types of type
> information we currently support.
>
> returns(Class) the description is as follows:
>
> This method takes a class that will be analyzed by Flink's type
> extraction capabilities.
>
> On 11.02.2015 21:42, Stephan Ewen wrote:
> > But in this case, there are no type parameters, correct? Centroid25
> is not
> > a generic class...
> >
> > On Wed, Feb 11, 2015 at 9:40 PM, Robert Metzger  wrote:
> >
> >> I think the issue is that the
> returns("eu.euranova.flink.Centroid25")
> >> variant only passes a string and the system does not know the
> >> typeparameters.
> >> So we have to put GenericTypeInfo there, because we basically see
> Object's.
> >>
> >> On Wed, Feb 11, 2015 at 9:37 PM, Stephan Ewen  wrote:
> >>
> >>> @Timo If I understand it correctly, both omitting the
> "returns(...)"
> >>> statement, or changing it to "returns(Centroid25.class)" would
> help?
> >>>
> >>> I think that the behavior between "returns(Centroid25.class)" and
> "
> >>> returns("eu.euranova.flink.Centroid25")" should be consistent in
> that
> >> they
> >>> both handle the type as a POJO.
> >>>
> >>> Stephan
> >>>
> >>>
> >>> On Wed, Feb 11, 2015 at 9:28 PM, Timo Walther
> >> wrote:
> >>>> Hey Nam-Luc,
> >>>>
> >>>> I think your problem lies in the following line:
> >>>>
> >>>> .returns("eu.euranova.flink.Centroid25")
> >>>>
> >>>> If you do not specify the fields of the class in the String by
> using
> >>>> "", the underlying parser will create an
> >>>> "GenericTypeInfo" type information which then uses Kryo for
> >>> serialization.
> >>>> In general, lambda expressions are a very new feature which
> currently
> >>>> makes a lot of problems due to missing type information by
> compilers.
> >>> Maybe
> >>>> it is better to use (anonymous) classes instead.
> >>>>
> >>>> In case of "map()" functions you don't need to provide type hints
> >> through
> >>>> the "returns()" method.
> >>>>
> >>>> For other operators you need to either specify all fields of the
> class
> >> in
> >>>> the String (makes no sense in you case) or you change the method
> to
> >>>>
> >>>> .returns(Centroid25.class)
> >>>>
> >>>> I hope that helps.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 11.02.2015 17:38, Nam-Luc Tran wrote:
> >>>>
> >>>>> Hello Stephan,
> >>>>>
> >>>>> Thank you for your help.
> >>>>>
> >>>>> I ensured all the POJO classes used comply to what you
> previously said
> >>>>> and the same exception occurs. Here is the listing of classes
> >>>>> Centroid25 and Point25:
> >>>>>
> >>>>> public class Centroid25 extends Point25 {
> >>>>>
> >>>>> public int id;
> >>>>>
> >>>>> public Centroid25() {}
> >>>>>
> >>>>> public Centroid25(int id, Double value0, Double value1, Double
> value2,
> >>>>> Double value3, Double value4, Double value5,
> >>>>> Double value6, Double value7, Double value8, Double value9,
> Double
> >>>>> value10, Double value11, Double value12,
> >>>>> Double value13, Double value14, Double value15, Double value16,
> Double
> >>>>> value17, Double value18,
> >>>>> Double value19, Double value20, Double value21, Double value22,
> Double
> >>>>> value23, Double value24) {
> >>>>> super(value0, value1, value2, value3, value4, value5, value6,
> value7,
> >>>>> value8, value9, value10, value11,
> >>>>> value12, value13, value14, value15, value16, value17, value18,
> >>>>> value19, value20, value21, value22,
> >>>>> value23, value24);
> >>>>> this.id = id;
> >>>>> }
> >>>>>
> >>>>> public Centroid25(int id, Point25 p) {
> >>>>> super(p.f0,
> >>>>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
> >>>>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
> >>>>> f22,p.f23,p.f24);
> >>>>> this.id = id;
> >>>>> }
> >>>>>
> >>>>> public Centroid25(int id, Tuple25 p) {
> >>>>> super(p.f0,
> >>>>> p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.f11,p.
> >>>>> f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.f21,p.
> >>>>> f22,p.f23,p.f24);
> >>>>> this.id = id;
> >>>>> }
> >>>>>
> >>>>> @Override
> >>>>> public String toString() {
> >>>>> return id + " " + super.toString();
> >>>>> }
> >>>>> }
> >>>>>
> >>>>> public class Point25{
> >>>>>
> >>>>> public Double
> >>>>> f0,f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13,f14,f15,f16,
> >>>>> f17,f18,f19,f20,f21,f22,f23,f24
> >>>>> = 0.0;
> >>>>>
> >>>>> public Point25() {
> >>>>> }
> >>>>>
> >>>>> public Point25(Double value0, Double value1, Double value2,
> Double
> >>>>> value3, Double value4, Double value5,
> >>>>> Double value6, Double value7, Double value8, Double value9,
> Double
> >>>>> value10, Double value11, Double value12,
> >>>>> Double value13, Double value14, Double value15, Double value16,
> Double
> >>>>> value17, Double value18,
> >>>>> Double value19, Double value20, Double value21, Double value22,
> Double
> >>>>> value23, Double value24) {
> >>>>> f0=value0;
> >>>>> f1=value1;
> >>>>> f2=value2;
> >>>>> f3=value3;
> >>>>> f4=value4;
> >>>>> f5=value5;
> >>>>> f6=value6;
> >>>>> f7=value7;
> >>>>> f8=value8;
> >>>>> f9=value9;
> >>>>> f10=value10;
> >>>>> f11=value11;
> >>>>> f12=value12;
> >>>>> f13=value13;
> >>>>> f14=value14;
> >>>>> f15=value15;
> >>>>> f16=value16;
> >>>>> f17=value17;
> >>>>> f18=value18;
> >>>>> f19=value19;
> >>>>> f20=value20;
> >>>>> f21=value21;
> >>>>> f22=value22;
> >>>>> f23=value23;
> >>>>> f24=value24;
> >>>>>
> >>>>> }
> >>>>>
> >>>>> public List getFieldsAsList() {
> >>>>> return Arrays.asList(f0, f1, f2, f3, f4, f5, f6, f7, f8, f9,
> f10, f11,
> >>>>> f12, f13, f14, f15, f16, f17, f18, f19,
> >>>>> f20, f21, f22, f23, f24);
> >>>>> }
> >>>>>
> >>>>> public Point25 add(Point25 other) {
> >>>>> f0 += other.f0;
> >>>>> f1 += other.f1;
> >>>>> f2 += other.f2;
> >>>>> f3 += other.f3;
> >>>>> f4 += other.f4;
> >>>>> f5 += other.f5;
> >>>>> f6 += other.f6;
> >>>>> f7 += other.f7;
> >>>>> f8 += other.f8;
> >>>>> f9 += other.f9;
> >>>>> f10 += other.f10;
> >>>>> f11 += other.f11;
> >>>>> f12 += other.f12;
> >>>>> f13 += other.f13;
> >>>>> f14 += other.f14;
> >>>>> f15 += other.f15;
> >>>>> f16 += other.f16;
> >>>>> f17 += other.f17;
> >>>>> f18 += other.f18;
> >>>>> f19 += other.f19;
> >>>>> f20 += other.f20;
> >>>>> f21 += other.f21;
> >>>>> f22 += other.f22;
> >>>>> f23 += other.f23;
> >>>>> f24 += other.f24;
> >>>>> return this;
> >>>>> }
> >>>>>
> >>>>> public Point25 div(long val) {
> >>>>> f0 /= val;
> >>>>> f1 /= val;
> >>>>> f2 /= val;
> >>>>> f3 /= val;
> >>>>> f4 /= val;
> >>>>> f5 += val;
> >>>>> f6 += val;
> >>>>> f7 += val;
> >>>>> f8 += val;
> >>>>> f9 += val;
> >>>>> f10 += val;
> >>>>> f11 += val;
> >>>>> f12 += val;
> >>>>> f13 += val;
> >>>>> f14 += val;
> >>>>> f15 += val;
> >>>>> f16 += val;
> >>>>> f17 += val;
> >>>>> f18 += val;
> >>>>> f19 += val;
> >>>>> f20 += val;
> >>>>> f21 += val;
> >>>>> f22 += val;
> >>>>> f23 += val;
> >>>>> f24 += val;
> >>>>> return this;
> >>>>> }
> >>>>>
> >>>>> public double euclideanDistance(Point25 other) {
> >>>>> List l = this.getFieldsAsList();
> >>>>> List ol = other.getFieldsAsList();
> >>>>> double res = 0;
> >>>>> for(int i=0;i
> >>>>>
> >>>>>> I came accross an error for which I am unable to retrace the
> exact
> >>>>>>
> >>>>> cause.
> >>>>>
> >>>>>> Starting from flink-java-examples module, I have extended the
> KMeans
> >>>>>> example
> >>>>>> to a case where points have 25 coordinates. It follows the
> exact
> >>>>>>
> >>>>> same
> >>>>>
> >>>>>> structure and transformations as the original example, only
> with
> >>>>>>
> >>>>> points
> >>>>>
> >>>>>> having 25 coordinates instead of 2.
> >>>>>>
> >>>>>> When creating the centroids dataset within the code as follows
> the
> >>>>>>
> >>>>> job
> >>>>>
> >>>>>> iterates and executes well:
> >>>>>>
> >>>>>> Centroid25 cent1 = new
> >>>>>>
> >>>>> Centroid25(ThreadLocalRandom.current().nextInt(0,
> >>>>>
> >>>>>> 1000),
> >>>>>>
> >>>>>>
> >>>>>>   -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
> >>>>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0);
> >>>>>
> >>>>>> Centroid25 cent2 = new
> >>>>>>
> >>>>> Centroid25(ThreadLocalRandom.current().nextInt(0,
> >>>>>
> >>>>>> 1000),
> >>>>>>
> >>>>>>
> >>>>>>   -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
> >>>>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0);
> >>>>>
> >>>>>> DataSet centroids = env.fromCollection(Arrays.asList(cent1,
> >>>>>> cent2));
> >>>>>>
> >>>>>> When reading from a csv file containing the following:
> >>>>>>
> >>>>>>
> >>>>>>   -10.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-0.0,
> >>>>> 0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,10.0
> >>>>>
> >>>>>>   -1.0,-1.0,1.0,1.0,-1.0,1.0,1.0,1.0,1.0,1.0,1.0,-1.0,1.0,1.
> >>>>> 0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0,1.0
> >>>>>
> >>>>>> with the following code:
> >>>>>> DataSet> centroids = env
> >>>>>>
> >>>>>> .readCsvFile("file:///home/nltran/res3.csv")
> >>>>>>
> >>>>>>
> >>>>> .fieldDelimiter(",")
> >>>>>
> >>>>>>
> >>>>> .includeFields("1111111111111111111111111")
> >>>>>
> >>>>>>
> >>>>> .types(Double.class, Double.class,
> >>>>>
> >>>>>> Double.class, Double.class,
> >>>>>> Double.class, Double.class,
> >>>>>>
> >>>>>>
> >>>>> Double.class,
> >>>>>
> >>>>>> Double.class, Double.class, Double.class, Double.class,
> >>>>>> Double.class,
> >>>>>>
> >>>>>>
> >>>>> Double.class,
> >>>>>
> >>>>>> Double.class, Double.class, Double.class, Double.class,
> >>>>>> Double.class,
> >>>>>>
> >>>>>>
> >>>>> Double.class,
> >>>>>
> >>>>>> Double.class, Double.class, Double.class, Double.class,
> >>>>>> Double.class,
> >>>>>>
> >>>>>>
> >>>>> Double.class).map(p -> {
> >>>>>
> >>>>>>
> >>>>> return new
> >>>>>
> >>>>>> Centroid25(ThreadLocalRandom.current().nextInt(0, 1000),
> >>>>>>
> >>>>>>
> >>>>>>   p.f0,p.f1,p.f2,p.f3,p.f4,p.f5,p.f6,p.f7,p.f8,p.f9,p.f10,p.
> >>>>> f11,p.f12,p.f13,p.f14,p.f15,p.f16,p.f17,p.f18,p.f19,p.f20,p.
> >>>>> f21,p.f22,p.f23,p.f24);
> >>>>>
> >>>>>>
> >>>>> }).returns("eu.euranova.flink.Centroid25");
> >>>>>
> >>>>>> I hit the following exception:
> >>>>>>
> >>>>>> 02/11/2015 14:58:27     PartialSolution (BulkIteration
> (Bulk
> >>>>>> Iteration))(1/1)
> >>>>>> switched to FAILED
> >>>>>> com.esotericsoftware.kryo.KryoException: Buffer underflow
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
> >>>>> NoFetchingInput.java:76)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> >>>>> DefaultClassResolver.java:109)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>
> >>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> >>>>> KryoSerializer.java:205)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>
> >>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> >>>>> KryoSerializer.java:210)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   org.apache.flink.runtime.io.disk.InputViewIterator.next(
> >>>>> InputViewIterator.java:43)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   org.apache.flink.runtime.operators.RegularPactTask.run(
> >>>>> RegularPactTask.java:496)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>
> >>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
> >>>>> AbstractIterativePactTask.java:138)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>
> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
> >>>>> IterationHeadPactTask.java:324)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   org.apache.flink.runtime.operators.RegularPactTask.
> >>>>> invoke(RegularPactTask.java:360)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   org.apache.flink.runtime.execution.RuntimeEnvironment.
> >>>>> run(RuntimeEnvironment.java:204)
> >>>>>
> >>>>>>            at java.lang.Thread.run(Thread.java:745)
> >>>>>>
> >>>>>> 02/11/2015 14:58:27     Job execution switched to status
> >>>>>>
> >>>>> FAILING.
> >>>>>
> >>>>>> 02/11/2015 14:58:27     CHAIN Map (Map at
> >>>>>>
> >>>>> main(DoTheKMeans.java:64)) ->
> >>>>>
> >>>>>> Map (Map
> >>>>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELING
> >>>>>> 02/11/2015 14:58:27     Combine (Reduce at
> >>>>>>
> >>>>> main(DoTheKMeans.java:68))(1/1)
> >>>>>
> >>>>>> switched to CANCELING
> >>>>>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
> >>>>>>
> >>>>> main(DoTheKMeans.java:68))
> >>>>>
> >>>>>> -> Map
> >>>>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELING
> >>>>>> 02/11/2015 14:58:27     DataSink(Print to System.out)(1/1)
> >>>>>>
> >>>>> switched to
> >>>>>
> >>>>>> CANCELED
> >>>>>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
> >>>>>>
> >>>>> Iteration))(1/1) switched
> >>>>>
> >>>>>> to
> >>>>>> CANCELING
> >>>>>> 02/11/2015 14:58:27     Sync(BulkIteration (Bulk
> >>>>>>
> >>>>> Iteration))(1/1) switched
> >>>>>
> >>>>>> to
> >>>>>> CANCELED
> >>>>>> 02/11/2015 14:58:27     CHAIN Map (Map at
> >>>>>>
> >>>>> main(DoTheKMeans.java:64)) ->
> >>>>>
> >>>>>> Map (Map
> >>>>>> at main(DoTheKMeans.java:65))(1/1) switched to CANCELED
> >>>>>> 02/11/2015 14:58:27     Combine (Reduce at
> >>>>>>
> >>>>> main(DoTheKMeans.java:68))(1/1)
> >>>>>
> >>>>>> switched to CANCELED
> >>>>>> 02/11/2015 14:58:27     CHAIN Reduce(Reduce at
> >>>>>>
> >>>>> main(DoTheKMeans.java:68))
> >>>>>
> >>>>>> -> Map
> >>>>>> (Map at main(DoTheKMeans.java:71))(1/1) switched to CANCELED
> >>>>>> 02/11/2015 14:58:27     Job execution switched to status
> FAILED.
> >>>>>> Exception in thread "main"
> >>>>>> org.apache.flink.runtime.client.JobExecutionException:
> >>>>>> com.esotericsoftware.kryo.KryoException: Buffer underflow
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>
> org.apache.flink.api.java.typeutils.runtime.NoFetchingInput.require(
> >>>>> NoFetchingInput.java:76)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>> com.esotericsoftware.kryo.io.Input.readVarInt(Input.java:355)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(
> >>>>> DefaultClassResolver.java:109)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>> com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>> com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>
> >>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> >>>>> KryoSerializer.java:205)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>
> >>>
> org.apache.flink.api.java.typeutils.runtime.KryoSerializer.deserialize(
> >>>>> KryoSerializer.java:210)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   org.apache.flink.runtime.io.disk.InputViewIterator.next(
> >>>>> InputViewIterator.java:43)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>
> org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:91)
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   org.apache.flink.runtime.operators.RegularPactTask.run(
> >>>>> RegularPactTask.java:496)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>
> >>
> org.apache.flink.runtime.iterative.task.AbstractIterativePactTask.run(
> >>>>> AbstractIterativePactTask.java:138)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>
> org.apache.flink.runtime.iterative.task.IterationHeadPactTask.run(
> >>>>> IterationHeadPactTask.java:324)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   org.apache.flink.runtime.operators.RegularPactTask.
> >>>>> invoke(RegularPactTask.java:360)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   org.apache.flink.runtime.execution.RuntimeEnvironment.
> >>>>> run(RuntimeEnvironment.java:204)
> >>>>>
> >>>>>>            at java.lang.Thread.run(Thread.java:745)
> >>>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   org.apache.flink.runtime.client.JobClientListener$$anonfun$
> >>>>> receiveWithLogMessages$2.applyOrElse(JobClient.scala:88)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(
> >>>>> AbstractPartialFunction.scala:33)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> >>>>> AbstractPartialFunction.scala:33)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   scala.runtime.AbstractPartialFunction$mcVL$sp.apply(
> >>>>> AbstractPartialFunction.scala:25)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   org.apache.flink.runtime.ActorLogMessages$$anon$1.
> >>>>> apply(ActorLogMessages.scala:37)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   org.apache.flink.runtime.ActorLogMessages$$anon$1.
> >>>>> apply(ActorLogMessages.scala:30)
> >>>>>
> >>>>>>            at
> >>>>>>
> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   org.apache.flink.runtime.ActorLogMessages$$anon$1.
> >>>>> applyOrElse(ActorLogMessages.scala:30)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>> akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   org.apache.flink.runtime.client.JobClientListener.
> >>>>> aroundReceive(JobClient.scala:74)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>> akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> >>>>>
> >>>>>>            at
> akka.actor.ActorCell.invoke(ActorCell.scala:487)
> >>>>>>            at
> >>>>>>
> >>>>> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
> >>>>>
> >>>>>>            at
> akka.dispatch.Mailbox.run(Mailbox.scala:221)
> >>>>>>            at
> akka.dispatch.Mailbox.exec(Mailbox.scala:231)
> >>>>>>            at
> >>>>>>
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.
> >>>>> runTask(ForkJoinPool.java:1339)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>   scala.concurrent.forkjoin.ForkJoinPool.runWorker(
> >>>>> ForkJoinPool.java:1979)
> >>>>>
> >>>>>>            at
> >>>>>>
> >>>>>>
> >>>>>>   scala.concurrent.forkjoin.ForkJoinWorkerThread.run(
> >>>>> ForkJoinWorkerThread.java:107)
> >>>>>
> >>>>>> The centroid25 data is exactly the same in both cases. Could
> you
> >>>>>>
> >>>>> help me
> >>>>>
> >>>>>> retrace what is wrong?
> >>>>>>
> >>>>>> Thanks and best regards,
> >>>>>>
> >>>>>> Tran Nam-Luc
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> View this message in context:
> >>>>>>
> >>>>>>   http://apache-flink-incubator-mailing-list-archive.1008284.
> >>>>> n3.nabble.com/kryoException-Buffer-underflow-tp3760.html
> >>>>>
> >>>>>> Sent from the Apache Flink (Incubator) Mailing List archive.
> mailing
> >>>>>>
> >>>>> list
> >>>>>
> >>>>>> archive at Nabble.com.
> >>>>>>
> >>>>>>
> >>>>>
>
>
>