[jira] [Created] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[jira] [Created] (FLINK-11774) IllegalArgumentException in HeapPriorityQueueSet

Shang Yuanchun (Jira)
Kirill Vainer created FLINK-11774:
-------------------------------------

             Summary: IllegalArgumentException in HeapPriorityQueueSet
                 Key: FLINK-11774
                 URL: https://issues.apache.org/jira/browse/FLINK-11774
             Project: Flink
          Issue Type: Bug
          Components: API / DataStream
    Affects Versions: 1.7.2
         Environment: Can reproduce on the following configurations:

 

OS: macOS 10.14.3

Java: 1.8.0_202

 

OS: CentOS 7.2.1511

Java: 1.8.0_102
            Reporter: Kirill Vainer
         Attachments: flink-bug-dist.zip, flink-bug-src.zip

Hi,

I encountered the following exception:
{code}
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
        at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146)
        at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647)
        at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
        at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510)
        at flink.bug.App.main(App.java:21)
Caused by: java.lang.IllegalArgumentException
        at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123)
        at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158)
        at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147)
        at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154)
        at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121)
        at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49)
        at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876)
        at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36)
        at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895)
        at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396)
        at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)
        at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
        at java.lang.Thread.run(Thread.java:748)
{code}

Code that reproduces the problem:
{code:java}
package flink.bug;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

public class App {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(2);

        env.fromElements(1, 2)
            .map(Aggregate::new)
            .keyBy(Aggregate::getKey)
            .timeWindow(Time.seconds(2))
            .reduce(Aggregate::reduce)
            .addSink(new CollectSink());

        env.execute();
    }

    private static class Aggregate {

        private Key key = new Key();

        public Aggregate(long number) {
        }

        public static Aggregate reduce(Aggregate a, Aggregate b) {
            return new Aggregate(0);
        }

        public Key getKey() {
            return key;
        }

    }

    public static class Key {
    }

    private static class CollectSink implements SinkFunction<Aggregate> {

        private static final long serialVersionUID = 1;

        @SuppressWarnings("rawtypes")
        @Override
        public void invoke(Aggregate value, Context ctx) throws Exception {
        }
    }
}
{code}

Attached is the project that can be executed with {{./gradlew run}} showing the problem, or you can run the attached {{flink-bug-dist.zip}} which is prepackaged with the dependencies.

Thanks in advance



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)