Kirill Vainer created FLINK-11774:
------------------------------------- Summary: IllegalArgumentException in HeapPriorityQueueSet Key: FLINK-11774 URL: https://issues.apache.org/jira/browse/FLINK-11774 Project: Flink Issue Type: Bug Components: API / DataStream Affects Versions: 1.7.2 Environment: Can reproduce on the following configurations: OS: macOS 10.14.3 Java: 1.8.0_202 OS: CentOS 7.2.1511 Java: 1.8.0_102 Reporter: Kirill Vainer Attachments: flink-bug-dist.zip, flink-bug-src.zip Hi, I encountered the following exception: {code} Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:146) at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:647) at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1510) at flink.bug.App.main(App.java:21) Caused by: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.globalKeyGroupToLocalIndex(HeapPriorityQueueSet.java:158) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForKeyGroup(HeapPriorityQueueSet.java:147) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.getDedupMapForElement(HeapPriorityQueueSet.java:154) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:121) at org.apache.flink.runtime.state.heap.HeapPriorityQueueSet.add(HeapPriorityQueueSet.java:49) at org.apache.flink.streaming.api.operators.InternalTimerServiceImpl.registerProcessingTimeTimer(InternalTimerServiceImpl.java:197) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerProcessingTimeTimer(WindowOperator.java:876) at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:36) at org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger.onElement(ProcessingTimeTrigger.java:28) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onElement(WindowOperator.java:895) at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:396) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:105) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704) at java.lang.Thread.run(Thread.java:748) {code} Code that reproduces the problem: {code:java} package flink.bug; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.streaming.api.windowing.time.Time; public class App { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); env.fromElements(1, 2) .map(Aggregate::new) .keyBy(Aggregate::getKey) .timeWindow(Time.seconds(2)) .reduce(Aggregate::reduce) .addSink(new CollectSink()); env.execute(); } private static class Aggregate { private Key key = new Key(); public Aggregate(long number) { } public static Aggregate reduce(Aggregate a, Aggregate b) { return new Aggregate(0); } public Key getKey() { return key; } } public static class Key { } private static class CollectSink implements SinkFunction<Aggregate> { private static final long serialVersionUID = 1; @SuppressWarnings("rawtypes") @Override public void invoke(Aggregate value, Context ctx) throws Exception { } } } {code} Attached is the project that can be executed with {{./gradlew run}} showing the problem, or you can run the attached {{flink-bug-dist.zip}} which is prepackaged with the dependencies. Thanks in advance -- This message was sent by Atlassian JIRA (v7.6.3#76005) |
Free forum by Nabble | Edit this page |