Alex Hall created FLINK-19739:
--------------------------------- Summary: CompileException when windowing in batch mode: A method named "replace" is not declared in any enclosing class nor any supertype Key: FLINK-19739 URL: https://issues.apache.org/jira/browse/FLINK-19739 Project: Flink Issue Type: Bug Components: API / Python, Table SQL / API Affects Versions: 1.11.2, 1.12.0 Environment: Ubuntu 18.04 Python 3.8, jar built from master yesterday. Or Python 3.7, installed latest version from pip. Reporter: Alex Hall Example script: ```python from pyflink.table import EnvironmentSettings, BatchTableEnvironment from pyflink.table.window import Tumble env_settings = ( EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() ) table_env = BatchTableEnvironment.create(environment_settings=env_settings) table_env.execute_sql( """ CREATE TABLE table1 ( amount INT, ts TIMESTAMP(3), WATERMARK FOR ts AS ts - INTERVAL '5' SECOND ) WITH ( 'connector.type' = 'filesystem', 'format.type' = 'csv', 'connector.path' = '/home/alex/work/test-flink/data1.csv' ) """ ) table1 = table_env.from_path("table1") table = ( table1 .window(Tumble.over("5.days").on("ts").alias("__window")) .group_by("__window") .select("amount.sum") ) print(table.to_pandas()) ``` Output: ``` WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil (file:/home/alex/work/flink/flink-dist/target/flink-1.12-SNAPSHOT-bin/flink-1.12-SNAPSHOT/opt/flink-python_2.11-1.12-SNAPSHOT.jar) to constructor java.nio.DirectByteBuffer(long,int) WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.python.shaded.io.netty.util.internal.ReflectionUtil WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release /* 1 */ /* 2 */ public class LocalHashWinAggWithoutKeys$59 extends org.apache.flink.table.runtime.operators.TableStreamOperator /* 3 */ implements org.apache.flink.streaming.api.operators.OneInputStreamOperator, org.apache.flink.streaming.api.operators.BoundedOneInput { /* 4 */ /* 5 */ private final Object[] references; /* 6 */ /* 7 */ private static final org.slf4j.Logger LOG$2 = /* 8 */ org.slf4j.LoggerFactory.getLogger("LocalHashWinAgg"); /* 9 */ /* 10 */ private transient org.apache.flink.table.types.logical.LogicalType[] aggMapKeyTypes$5; /* 11 */ private transient org.apache.flink.table.types.logical.LogicalType[] aggBufferTypes$6; /* 12 */ private transient org.apache.flink.table.runtime.operators.aggregate.BytesHashMap aggregateMap$7; /* 13 */ org.apache.flink.table.data.binary.BinaryRowData emptyAggBuffer$9 = new org.apache.flink.table.data.binary.BinaryRowData(1); /* 14 */ org.apache.flink.table.data.writer.BinaryRowWriter emptyAggBufferWriterTerm$10 = new org.apache.flink.table.data.writer.BinaryRowWriter(emptyAggBuffer$9); /* 15 */ org.apache.flink.table.data.GenericRowData hashAggOutput = new org.apache.flink.table.data.GenericRowData(2); /* 16 */ private transient org.apache.flink.table.data.binary.BinaryRowData reuseAggMapKey$17 = new org.apache.flink.table.data.binary.BinaryRowData(1); /* 17 */ private transient org.apache.flink.table.data.binary.BinaryRowData reuseAggBuffer$18 = new org.apache.flink.table.data.binary.BinaryRowData(1); /* 18 */ private transient org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry reuseAggMapEntry$19 = new org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry(reuseAggMapKey$17, reuseAggBuffer$18); /* 19 */ org.apache.flink.table.data.binary.BinaryRowData aggMapKey$3 = new org.apache.flink.table.data.binary.BinaryRowData(1); /* 20 */ org.apache.flink.table.data.writer.BinaryRowWriter aggMapKeyWriter$4 = new org.apache.flink.table.data.writer.BinaryRowWriter(aggMapKey$3); /* 21 */ private boolean hasInput = false; /* 22 */ org.apache.flink.streaming.runtime.streamrecord.StreamRecord element = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord((Object)null); /* 23 */ private final org.apache.flink.streaming.runtime.streamrecord.StreamRecord outElement = new org.apache.flink.streaming.runtime.streamrecord.StreamRecord(null); /* 24 */ /* 25 */ public LocalHashWinAggWithoutKeys$59( /* 26 */ Object[] references, /* 27 */ org.apache.flink.streaming.runtime.tasks.StreamTask task, /* 28 */ org.apache.flink.streaming.api.graph.StreamConfig config, /* 29 */ org.apache.flink.streaming.api.operators.Output output, /* 30 */ org.apache.flink.streaming.runtime.tasks.ProcessingTimeService processingTimeService) throws Exception { /* 31 */ this.references = references; /* 32 */ aggMapKeyTypes$5 = (((org.apache.flink.table.types.logical.LogicalType[]) references[0])); /* 33 */ aggBufferTypes$6 = (((org.apache.flink.table.types.logical.LogicalType[]) references[1])); /* 34 */ this.setup(task, config, output); /* 35 */ if (this instanceof org.apache.flink.streaming.api.operators.AbstractStreamOperator) { /* 36 */ ((org.apache.flink.streaming.api.operators.AbstractStreamOperator) this) /* 37 */ .setProcessingTimeService(processingTimeService); /* 38 */ } /* 39 */ } /* 40 */ /* 41 */ @Override /* 42 */ public void open() throws Exception { /* 43 */ super.open(); /* 44 */ aggregateMap$7 = new org.apache.flink.table.runtime.operators.aggregate.BytesHashMap(this.getContainingTask(),this.getContainingTask().getEnvironment().getMemoryManager(),computeMemorySize(), aggMapKeyTypes$5, aggBufferTypes$6); /* 45 */ /* 46 */ /* 47 */ emptyAggBufferWriterTerm$10.reset(); /* 48 */ /* 49 */ /* 50 */ if (true) { /* 51 */ emptyAggBufferWriterTerm$10.setNullAt(0); /* 52 */ } else { /* 53 */ emptyAggBufferWriterTerm$10.writeInt(0, ((int) -1)); /* 54 */ } /* 55 */ /* 56 */ emptyAggBufferWriterTerm$10.complete(); /* 57 */ /* 58 */ } /* 59 */ /* 60 */ @Override /* 61 */ public void processElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord element) throws Exception { /* 62 */ org.apache.flink.table.data.RowData in1 = (org.apache.flink.table.data.RowData) element.getValue(); /* 63 */ /* 64 */ org.apache.flink.table.data.binary.BinaryRowData currentAggBuffer$8; /* 65 */ int field$11; /* 66 */ boolean isNull$11; /* 67 */ int field$12; /* 68 */ boolean isNull$12; /* 69 */ boolean isNull$13; /* 70 */ int result$14; /* 71 */ org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.LookupInfo lookupInfo$20; /* 72 */ org.apache.flink.table.data.TimestampData field$21; /* 73 */ boolean isNull$21; /* 74 */ boolean isNull$22; /* 75 */ long result$23; /* 76 */ boolean isNull$24; /* 77 */ long result$25; /* 78 */ boolean isNull$26; /* 79 */ long result$27; /* 80 */ boolean isNull$28; /* 81 */ long result$29; /* 82 */ boolean isNull$30; /* 83 */ long result$31; /* 84 */ boolean isNull$32; /* 85 */ long result$33; /* 86 */ boolean isNull$34; /* 87 */ boolean result$35; /* 88 */ boolean isNull$36; /* 89 */ long result$37; /* 90 */ boolean isNull$38; /* 91 */ long result$39; /* 92 */ boolean isNull$40; /* 93 */ long result$41; /* 94 */ boolean isNull$42; /* 95 */ long result$43; /* 96 */ boolean isNull$44; /* 97 */ long result$45; /* 98 */ boolean isNull$46; /* 99 */ long result$47; /* 100 */ boolean isNull$48; /* 101 */ long result$49; /* 102 */ boolean isNull$50; /* 103 */ long result$51; /* 104 */ boolean isNull$52; /* 105 */ long result$53; /* 106 */ boolean isNull$55; /* 107 */ long result$56; /* 108 */ boolean isNull$57; /* 109 */ long result$58; /* 110 */ /* 111 */ /* 112 */ if (!in1.isNullAt(1)) { /* 113 */ hasInput = true; /* 114 */ // input field access for group key projection, window/pane assign /* 115 */ // and aggregate map update /* 116 */ isNull$11 = in1.isNullAt(0); /* 117 */ field$11 = -1; /* 118 */ if (!isNull$11) { /* 119 */ field$11 = in1.getInt(0); /* 120 */ } /* 121 */ isNull$21 = in1.isNullAt(1); /* 122 */ field$21 = null; /* 123 */ if (!isNull$21) { /* 124 */ field$21 = in1.getTimestamp(1, 3); /* 125 */ } /* 126 */ // assign timestamp(window or pane) /* 127 */ /* 128 */ /* 129 */ /* 130 */ /* 131 */ /* 132 */ isNull$22 = isNull$21; /* 133 */ result$23 = -1L; /* 134 */ if (!isNull$22) { /* 135 */ /* 136 */ result$23 = field$21.getMillisecond(); /* 137 */ /* 138 */ } /* 139 */ /* 140 */ /* 141 */ isNull$24 = isNull$22 || false; /* 142 */ result$25 = -1L; /* 143 */ if (!isNull$24) { /* 144 */ /* 145 */ result$25 = (long) (result$23 * ((long) 1L)); /* 146 */ /* 147 */ } /* 148 */ /* 149 */ isNull$26 = isNull$21; /* 150 */ result$27 = -1L; /* 151 */ if (!isNull$26) { /* 152 */ /* 153 */ result$27 = field$21.getMillisecond(); /* 154 */ /* 155 */ } /* 156 */ /* 157 */ /* 158 */ isNull$28 = isNull$26 || false; /* 159 */ result$29 = -1L; /* 160 */ if (!isNull$28) { /* 161 */ /* 162 */ result$29 = (long) (result$27 * ((long) 1L)); /* 163 */ /* 164 */ } /* 165 */ /* 166 */ /* 167 */ isNull$30 = isNull$28 || false; /* 168 */ result$31 = -1L; /* 169 */ if (!isNull$30) { /* 170 */ /* 171 */ result$31 = (long) (result$29 - ((long) 0L)); /* 172 */ /* 173 */ } /* 174 */ /* 175 */ /* 176 */ isNull$32 = isNull$30 || false; /* 177 */ result$33 = -1L; /* 178 */ if (!isNull$32) { /* 179 */ /* 180 */ result$33 = (long) (result$31 % ((long) 432000000L)); /* 181 */ /* 182 */ } /* 183 */ /* 184 */ /* 185 */ isNull$34 = isNull$32 || false; /* 186 */ result$35 = false; /* 187 */ if (!isNull$34) { /* 188 */ /* 189 */ result$35 = result$33 < ((int) 0); /* 190 */ /* 191 */ } /* 192 */ /* 193 */ long result$54 = -1L; /* 194 */ boolean isNull$54; /* 195 */ if (result$35) { /* 196 */ /* 197 */ /* 198 */ /* 199 */ /* 200 */ /* 201 */ /* 202 */ isNull$36 = isNull$21; /* 203 */ result$37 = -1L; /* 204 */ if (!isNull$36) { /* 205 */ /* 206 */ result$37 = field$21.getMillisecond(); /* 207 */ /* 208 */ } /* 209 */ /* 210 */ /* 211 */ isNull$38 = isNull$36 || false; /* 212 */ result$39 = -1L; /* 213 */ if (!isNull$38) { /* 214 */ /* 215 */ result$39 = (long) (result$37 * ((long) 1L)); /* 216 */ /* 217 */ } /* 218 */ /* 219 */ /* 220 */ isNull$40 = isNull$38 || false; /* 221 */ result$41 = -1L; /* 222 */ if (!isNull$40) { /* 223 */ /* 224 */ result$41 = (long) (result$39 - ((long) 0L)); /* 225 */ /* 226 */ } /* 227 */ /* 228 */ /* 229 */ isNull$42 = isNull$40 || false; /* 230 */ result$43 = -1L; /* 231 */ if (!isNull$42) { /* 232 */ /* 233 */ result$43 = (long) (result$41 % ((long) 432000000L)); /* 234 */ /* 235 */ } /* 236 */ /* 237 */ /* 238 */ isNull$44 = isNull$42 || false; /* 239 */ result$45 = -1L; /* 240 */ if (!isNull$44) { /* 241 */ /* 242 */ result$45 = (long) (result$43 + ((long) 432000000L)); /* 243 */ /* 244 */ } /* 245 */ /* 246 */ isNull$54 = isNull$44; /* 247 */ if (!isNull$54) { /* 248 */ result$54 = result$45; /* 249 */ } /* 250 */ } /* 251 */ else { /* 252 */ /* 253 */ /* 254 */ /* 255 */ /* 256 */ /* 257 */ isNull$46 = isNull$21; /* 258 */ result$47 = -1L; /* 259 */ if (!isNull$46) { /* 260 */ /* 261 */ result$47 = field$21.getMillisecond(); /* 262 */ /* 263 */ } /* 264 */ /* 265 */ /* 266 */ isNull$48 = isNull$46 || false; /* 267 */ result$49 = -1L; /* 268 */ if (!isNull$48) { /* 269 */ /* 270 */ result$49 = (long) (result$47 * ((long) 1L)); /* 271 */ /* 272 */ } /* 273 */ /* 274 */ /* 275 */ isNull$50 = isNull$48 || false; /* 276 */ result$51 = -1L; /* 277 */ if (!isNull$50) { /* 278 */ /* 279 */ result$51 = (long) (result$49 - ((long) 0L)); /* 280 */ /* 281 */ } /* 282 */ /* 283 */ /* 284 */ isNull$52 = isNull$50 || false; /* 285 */ result$53 = -1L; /* 286 */ if (!isNull$52) { /* 287 */ /* 288 */ result$53 = (long) (result$51 % ((long) 432000000L)); /* 289 */ /* 290 */ } /* 291 */ /* 292 */ isNull$54 = isNull$52; /* 293 */ if (!isNull$54) { /* 294 */ result$54 = result$53; /* 295 */ } /* 296 */ } /* 297 */ isNull$55 = isNull$24 || isNull$54; /* 298 */ result$56 = -1L; /* 299 */ if (!isNull$55) { /* 300 */ /* 301 */ result$56 = (long) (result$25 - result$54); /* 302 */ /* 303 */ } /* 304 */ /* 305 */ /* 306 */ isNull$57 = isNull$55 || false; /* 307 */ result$58 = -1L; /* 308 */ if (!isNull$57) { /* 309 */ /* 310 */ result$58 = (long) (result$56 - ((long) 0L)); /* 311 */ /* 312 */ } /* 313 */ /* 314 */ // process each input /* 315 */ /* 316 */ // build aggregate map key /* 317 */ /* 318 */ /* 319 */ aggMapKeyWriter$4.reset(); /* 320 */ /* 321 */ /* 322 */ if (false) { /* 323 */ aggMapKeyWriter$4.setNullAt(0); /* 324 */ } else { /* 325 */ aggMapKeyWriter$4.writeLong(0, result$58); /* 326 */ } /* 327 */ /* 328 */ aggMapKeyWriter$4.complete(); /* 329 */ /* 330 */ // aggregate by each input with assigned timestamp /* 331 */ // look up output buffer using current key (grouping keys ..., assigned timestamp) /* 332 */ lookupInfo$20 = aggregateMap$7.lookup(aggMapKey$3); /* 333 */ currentAggBuffer$8 = lookupInfo$20.getValue(); /* 334 */ if (!lookupInfo$20.isFound()) { /* 335 */ /* 336 */ // append empty agg buffer into aggregate map for current group key /* 337 */ try { /* 338 */ currentAggBuffer$8 = /* 339 */ aggregateMap$7.append(lookupInfo$20, emptyAggBuffer$9); /* 340 */ } catch (java.io.EOFException exp) { /* 341 */ /* 342 */ LOG$2.info("BytesHashMap out of memory with {} entries, output directly.", aggregateMap$7.getNumElements()); /* 343 */ // hash map out of memory, output directly /* 344 */ /* 345 */ org.apache.flink.util.MutableObjectIterator<org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry> iterator = /* 346 */ aggregateMap$7.getEntryIterator(); /* 347 */ while (iterator.next(reuseAggMapEntry$19) != null) { /* 348 */ /* 349 */ /* 350 */ /* 351 */ hashAggOutput.replace(reuseAggMapKey$17, reuseAggBuffer$18); /* 352 */ /* 353 */ output.collect(outElement.replace(hashAggOutput)); /* 354 */ } /* 355 */ /* 356 */ // retry append /* 357 */ /* 358 */ // reset aggregate map retry append /* 359 */ aggregateMap$7.reset(); /* 360 */ lookupInfo$20 = aggregateMap$7.lookup(aggMapKey$3); /* 361 */ try { /* 362 */ currentAggBuffer$8 = /* 363 */ aggregateMap$7.append(lookupInfo$20, emptyAggBuffer$9); /* 364 */ } catch (java.io.EOFException e) { /* 365 */ throw new OutOfMemoryError("BytesHashMap Out of Memory."); /* 366 */ } /* 367 */ /* 368 */ /* 369 */ } /* 370 */ } /* 371 */ // aggregate buffer fields access /* 372 */ isNull$12 = currentAggBuffer$8.isNullAt(0); /* 373 */ field$12 = -1; /* 374 */ if (!isNull$12) { /* 375 */ field$12 = currentAggBuffer$8.getInt(0); /* 376 */ } /* 377 */ // do aggregate and update agg buffer /* 378 */ int result$16 = -1; /* 379 */ boolean isNull$16; /* 380 */ if (isNull$11) { /* 381 */ /* 382 */ isNull$16 = isNull$12; /* 383 */ if (!isNull$16) { /* 384 */ result$16 = field$12; /* 385 */ } /* 386 */ } /* 387 */ else { /* 388 */ int result$15 = -1; /* 389 */ boolean isNull$15; /* 390 */ if (isNull$12) { /* 391 */ /* 392 */ isNull$15 = isNull$11; /* 393 */ if (!isNull$15) { /* 394 */ result$15 = field$11; /* 395 */ } /* 396 */ } /* 397 */ else { /* 398 */ /* 399 */ /* 400 */ /* 401 */ isNull$13 = isNull$12 || isNull$11; /* 402 */ result$14 = -1; /* 403 */ if (!isNull$13) { /* 404 */ /* 405 */ result$14 = (int) (field$12 + field$11); /* 406 */ /* 407 */ } /* 408 */ /* 409 */ isNull$15 = isNull$13; /* 410 */ if (!isNull$15) { /* 411 */ result$15 = result$14; /* 412 */ } /* 413 */ } /* 414 */ isNull$16 = isNull$15; /* 415 */ if (!isNull$16) { /* 416 */ result$16 = result$15; /* 417 */ } /* 418 */ } /* 419 */ if (isNull$16) { /* 420 */ currentAggBuffer$8.setNullAt(0); /* 421 */ } else { /* 422 */ currentAggBuffer$8.setInt(0, result$16); /* 423 */ } /* 424 */ /* 425 */ } /* 426 */ } /* 427 */ /* 428 */ /* 429 */ @Override /* 430 */ public void endInput() throws Exception { /* 431 */ org.apache.flink.table.data.binary.BinaryRowData currentAggBuffer$8; /* 432 */ int field$11; /* 433 */ boolean isNull$11; /* 434 */ int field$12; /* 435 */ boolean isNull$12; /* 436 */ boolean isNull$13; /* 437 */ int result$14; /* 438 */ org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.LookupInfo lookupInfo$20; /* 439 */ org.apache.flink.table.data.TimestampData field$21; /* 440 */ boolean isNull$21; /* 441 */ boolean isNull$22; /* 442 */ long result$23; /* 443 */ boolean isNull$24; /* 444 */ long result$25; /* 445 */ boolean isNull$26; /* 446 */ long result$27; /* 447 */ boolean isNull$28; /* 448 */ long result$29; /* 449 */ boolean isNull$30; /* 450 */ long result$31; /* 451 */ boolean isNull$32; /* 452 */ long result$33; /* 453 */ boolean isNull$34; /* 454 */ boolean result$35; /* 455 */ boolean isNull$36; /* 456 */ long result$37; /* 457 */ boolean isNull$38; /* 458 */ long result$39; /* 459 */ boolean isNull$40; /* 460 */ long result$41; /* 461 */ boolean isNull$42; /* 462 */ long result$43; /* 463 */ boolean isNull$44; /* 464 */ long result$45; /* 465 */ boolean isNull$46; /* 466 */ long result$47; /* 467 */ boolean isNull$48; /* 468 */ long result$49; /* 469 */ boolean isNull$50; /* 470 */ long result$51; /* 471 */ boolean isNull$52; /* 472 */ long result$53; /* 473 */ boolean isNull$55; /* 474 */ long result$56; /* 475 */ boolean isNull$57; /* 476 */ long result$58; /* 477 */ /* 478 */ org.apache.flink.util.MutableObjectIterator<org.apache.flink.table.runtime.operators.aggregate.BytesHashMap.Entry> iterator = /* 479 */ aggregateMap$7.getEntryIterator(); /* 480 */ while (iterator.next(reuseAggMapEntry$19) != null) { /* 481 */ /* 482 */ /* 483 */ /* 484 */ hashAggOutput.replace(reuseAggMapKey$17, reuseAggBuffer$18); /* 485 */ /* 486 */ output.collect(outElement.replace(hashAggOutput)); /* 487 */ } /* 488 */ /* 489 */ } /* 490 */ /* 491 */ /* 492 */ @Override /* 493 */ public void close() throws Exception { /* 494 */ super.close(); /* 495 */ aggregateMap$7.free(); /* 496 */ /* 497 */ } /* 498 */ /* 499 */ /* 500 */ } /* 501 */ Traceback (most recent call last): File "/home/alex/.config/JetBrains/PyCharm2020.2/scratches/scratch_903.py", line 32, in <module> print(table.to_pandas()) File "/home/alex/work/flink/flink-python/pyflink/table/table.py", line 829, in to_pandas if batches.hasNext(): File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/java_gateway.py", line 1285, in __call__ return_value = get_return_value( File "/home/alex/work/flink/flink-python/pyflink/util/exceptions.py", line 147, in deco return f(*a, **kw) File "/home/alex/.cache/pypoetry/virtualenvs/relycomply-WA1zLZ2n-py3.8/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value raise Py4JJavaError( py4j.protocol.Py4JJavaError: An error occurred while calling o51.hasNext. : java.lang.RuntimeException: Failed to fetch next result at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:106) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.hasNext(CollectResultIterator.java:77) at org.apache.flink.table.planner.sinks.SelectTableSinkBase$RowIteratorWrapper.hasNext(SelectTableSinkBase.java:115) at org.apache.flink.table.api.internal.TableResultImpl$CloseableRowIteratorWrapper.hasNext(TableResultImpl.java:355) at org.apache.flink.table.runtime.arrow.ArrowUtils$1.hasNext(ArrowUtils.java:644) at org.apache.flink.table.runtime.arrow.ArrowUtils$2.hasNext(ArrowUtils.java:666) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.io.IOException: Failed to fetch job execution result at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:175) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.next(CollectResultFetcher.java:126) at org.apache.flink.streaming.api.operators.collect.CollectResultIterator.nextResultFromFetcher(CollectResultIterator.java:103) ... 16 more Caused by: java.util.concurrent.ExecutionException: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:395) at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2022) at org.apache.flink.streaming.api.operators.collect.CollectResultFetcher.getAccumulatorResults(CollectResultFetcher.java:172) ... 18 more Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$2(MiniClusterJobClient.java:119) at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:680) at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:658) at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2094) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.getJobExecutionResult(MiniClusterJobClient.java:117) ... 19 more Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:116) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:78) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:217) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:210) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:204) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:526) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:413) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:566) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:284) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:199) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at akka.actor.Actor$class.aroundReceive(Actor.scala:517) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: java.lang.RuntimeException: Could not instantiate generated class 'LocalHashWinAggWithoutKeys$59' at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:67) at org.apache.flink.table.runtime.operators.CodeGenOperatorFactory.createStreamOperator(CodeGenOperatorFactory.java:40) at org.apache.flink.streaming.api.operators.StreamOperatorFactoryUtil.createOperator(StreamOperatorFactoryUtil.java:70) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperator(OperatorChain.java:613) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:583) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOperatorChain(OperatorChain.java:574) at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:526) at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:164) at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:485) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:533) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:722) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:547) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: org.apache.flink.util.FlinkRuntimeException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:68) at org.apache.flink.table.runtime.generated.GeneratedClass.compile(GeneratedClass.java:78) at org.apache.flink.table.runtime.generated.GeneratedClass.newInstance(GeneratedClass.java:65) ... 13 more Caused by: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2203) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache.get(LocalCache.java:3937) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4739) at org.apache.flink.table.runtime.generated.CompileUtils.compile(CompileUtils.java:66) ... 15 more Caused by: org.apache.flink.api.common.InvalidProgramException: Table program cannot be compiled. This is a bug. Please file an issue. at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:81) at org.apache.flink.table.runtime.generated.CompileUtils.lambda$compile$1(CompileUtils.java:66) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4742) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3527) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2319) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2282) at org.apache.flink.shaded.guava18.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2197) ... 18 more Caused by: org.codehaus.commons.compiler.CompileException: Line 351, Column 33: A method named "replace" is not declared in any enclosing class nor any supertype, nor through a static import at org.codehaus.janino.UnitCompiler.compileError(UnitCompiler.java:12124) at org.codehaus.janino.UnitCompiler.findIMethod(UnitCompiler.java:8997) at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5060) at org.codehaus.janino.UnitCompiler.access$9100(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4421) at org.codehaus.janino.UnitCompiler$16.visitMethodInvocation(UnitCompiler.java:4394) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4394) at org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5575) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:3781) at org.codehaus.janino.UnitCompiler.access$5900(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3760) at org.codehaus.janino.UnitCompiler$13.visitMethodInvocation(UnitCompiler.java:3732) at org.codehaus.janino.Java$MethodInvocation.accept(Java.java:5062) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3732) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2360) at org.codehaus.janino.UnitCompiler.access$1800(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1494) at org.codehaus.janino.UnitCompiler$6.visitExpressionStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$ExpressionStatement.accept(Java.java:2871) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553) at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493) at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487) at org.codehaus.janino.Java$Block.accept(Java.java:2776) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1842) at org.codehaus.janino.UnitCompiler.access$2200(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitWhileStatement(UnitCompiler.java:1498) at org.codehaus.janino.UnitCompiler$6.visitWhileStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$WhileStatement.accept(Java.java:3052) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553) at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493) at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487) at org.codehaus.janino.Java$Block.accept(Java.java:2776) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileTryCatch(UnitCompiler.java:3136) at org.codehaus.janino.UnitCompiler.compileTryCatchFinally(UnitCompiler.java:2966) at org.codehaus.janino.UnitCompiler.compileTryCatchFinallyWithResources(UnitCompiler.java:2770) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2742) at org.codehaus.janino.UnitCompiler.access$2300(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitTryStatement(UnitCompiler.java:1499) at org.codehaus.janino.UnitCompiler$6.visitTryStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$TryStatement.accept(Java.java:3238) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553) at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493) at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487) at org.codehaus.janino.Java$Block.accept(Java.java:2776) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2476) at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495) at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:1553) at org.codehaus.janino.UnitCompiler.access$1700(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1493) at org.codehaus.janino.UnitCompiler$6.visitBlock(UnitCompiler.java:1487) at org.codehaus.janino.Java$Block.accept(Java.java:2776) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:2476) at org.codehaus.janino.UnitCompiler.access$1900(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1495) at org.codehaus.janino.UnitCompiler$6.visitIfStatement(UnitCompiler.java:1487) at org.codehaus.janino.Java$IfStatement.accept(Java.java:2947) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:1487) at org.codehaus.janino.UnitCompiler.compileStatements(UnitCompiler.java:1567) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:3388) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1357) at org.codehaus.janino.UnitCompiler.compileDeclaredMethods(UnitCompiler.java:1330) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:822) at org.codehaus.janino.UnitCompiler.compile2(UnitCompiler.java:432) at org.codehaus.janino.UnitCompiler.access$400(UnitCompiler.java:215) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:411) at org.codehaus.janino.UnitCompiler$2.visitPackageMemberClassDeclaration(UnitCompiler.java:406) at org.codehaus.janino.Java$PackageMemberClassDeclaration.accept(Java.java:1414) at org.codehaus.janino.UnitCompiler.compile(UnitCompiler.java:406) at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:378) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:237) at org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:465) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:216) at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:207) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80) at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75) at org.apache.flink.table.runtime.generated.CompileUtils.doCompile(CompileUtils.java:78) ... 24 more ``` However it works fine in streaming mode: ```python env_settings = ( EnvironmentSettings.new_instance().in_streaming_mode().use_blink_planner().build() ) table_env = StreamTableEnvironment.create(environment_settings=env_settings) ``` How the table is created seems irrelevant - this raises the same error: ```python from datetime import datetime from pyflink.table import DataTypes, BatchTableEnvironment, EnvironmentSettings from pyflink.table.window import Tumble env_settings = ( EnvironmentSettings.new_instance().in_batch_mode().use_blink_planner().build() ) table_environment = BatchTableEnvironment.create(environment_settings=env_settings) transactions = table_environment.from_elements( [ (1, datetime(2000, 1, 1, 0, 0, 0)), (-2, datetime(2000, 1, 2, 0, 0, 0)), (3, datetime(2000, 1, 3, 0, 0, 0)), (-4, datetime(2000, 1, 4, 0, 0, 0)), ], DataTypes.ROW( [ DataTypes.FIELD("amount", DataTypes.BIGINT()), DataTypes.FIELD("ts", DataTypes.TIMESTAMP(3)), ] ), ) table = ( transactions .window(Tumble.over("5.days").on("ts").alias("__window")) .group_by("__window") .select("amount.sum") ) print(table.to_pandas()) ``` -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |