Distributed cache fault

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Distributed cache fault

Vasyl Bervetskyi
Hi there,

I faced with issue in adding file to distributed cache in Flink.
My setup:

-          Java 1.8

-          Flink 1.8

-          OS: Windows, Linux
Test scenario:

1.       Create simple stream environment

2.       Add to distributed cache local file

3.       Add simple source function and sink

4.       Execute job from CLI (Windows/Linux)

In order to restore job from savepoint or from checkpoint we need to run our job from Flink CLI. And pipelines that have distributed cache fails their execution.
Moreover it is different in Linux and Windows systems: in Windows we get "java.nio.file.InvalidPathException: Illegal char <:> at index 4" and on Linux we have our Flink freezing (it just stops and do not do anything, no any error message or stacktrace).

My piece of code for windows environment:


public class CachePipeline {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
        see.registerCachedFile("file:///D:/test.csv", "CacheFile");

        see.addSource(new SourceFunction<Object>() {
            @Override
            public void run(SourceContext<Object> ctx) throws Exception {
                ctx.collect(new Object());
            }

            @Override
            public void cancel() {

            }
        }).print();

        see.execute();
    }
}

command for running job that I used for:

flink run -c test.CachePipeline D:\path\to\jar\cache-test.jar

Did anybody face with this?
Reply | Threaded
Open this post in threaded view
|

Re: Distributed cache fault

Till Rohrmann
Hi Vasyl,

please post these kind of question to Flink's user ML since the dev ML is
used for development discussions.

For the failure on Windows could you share the complete stack trace to see
where exactly it fails? It looks as if on Windows the scheme part of the
URI makes problems.

Looking at the example program, you have effectively implemented a busy
loop which will eat up all your CPU cycles. Moreover you create a new
object in every iteration which puts a high load on the JVM's GC. This
might explain why Flink seems to freeze. Could you check the CPU load on
the machines? Alternatively, you could insert a `Thread.sleep(1)` after
every `collect()` call. As a side note, you should always emit objects
under the checkpointing lock. Otherwise Flink cannot give you proper
processing guarantees:

synchronized (ctx.getCheckpointLock()) {
    ctx.collect(...);
}

Cheers,
Till

On Mon, May 27, 2019 at 5:54 PM Vasyl Bervetskyi <[hidden email]>
wrote:

> Hi there,
>
> I faced with issue in adding file to distributed cache in Flink.
> My setup:
>
> -          Java 1.8
>
> -          Flink 1.8
>
> -          OS: Windows, Linux
> Test scenario:
>
> 1.       Create simple stream environment
>
> 2.       Add to distributed cache local file
>
> 3.       Add simple source function and sink
>
> 4.       Execute job from CLI (Windows/Linux)
>
> In order to restore job from savepoint or from checkpoint we need to run
> our job from Flink CLI. And pipelines that have distributed cache fails
> their execution.
> Moreover it is different in Linux and Windows systems: in Windows we get
> "java.nio.file.InvalidPathException: Illegal char <:> at index 4" and on
> Linux we have our Flink freezing (it just stops and do not do anything, no
> any error message or stacktrace).
>
> My piece of code for windows environment:
>
>
> public class CachePipeline {
>
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment see =
> StreamExecutionEnvironment.getExecutionEnvironment();
>         see.registerCachedFile("file:///D:/test.csv", "CacheFile");
>
>         see.addSource(new SourceFunction<Object>() {
>             @Override
>             public void run(SourceContext<Object> ctx) throws Exception {
>                 ctx.collect(new Object());
>             }
>
>             @Override
>             public void cancel() {
>
>             }
>         }).print();
>
>         see.execute();
>     }
> }
>
> command for running job that I used for:
>
> flink run -c test.CachePipeline D:\path\to\jar\cache-test.jar
>
> Did anybody face with this?
>