Flink Redis connectivity

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Flink Redis connectivity

Ramya Ramamurthy
Hi,

As per the understanding we have from the documentation, I guess its not
possible to take the redis connection within the Data Stream. In that case,
how should i proceed ? How can i access a DB client object within the
stream ??

I am using Flink 1.7. any help here would be appreciated. Thanks.

  RedisClient redisClient = new
RedisClient(RedisURI.create("redis://localhost:6379"));
                RedisConnection<String, String> client =
redisClient.connect();
DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
String>) value -> {

            String ct = value.getField(5).toString();

            String res = "";
            if (ct.equals("14") || ct.equals("4")) {

                res = client.set("key", "val");
            }
            return res;
        });

Thanks,
Reply | Threaded
Open this post in threaded view
|

Re: Flink Redis connectivity

Yangze Guo
Hi,

I think you could implement `RichMapFunction` and create `redisClient`
in the `open` method.

Best,
Yangze Guo

On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <[hidden email]> wrote:

>
> Hi,
>
> As per the understanding we have from the documentation, I guess its not
> possible to take the redis connection within the Data Stream. In that case,
> how should i proceed ? How can i access a DB client object within the
> stream ??
>
> I am using Flink 1.7. any help here would be appreciated. Thanks.
>
>   RedisClient redisClient = new
> RedisClient(RedisURI.create("redis://localhost:6379"));
>                 RedisConnection<String, String> client =
> redisClient.connect();
> DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
> String>) value -> {
>
>             String ct = value.getField(5).toString();
>
>             String res = "";
>             if (ct.equals("14") || ct.equals("4")) {
>
>                 res = client.set("key", "val");
>             }
>             return res;
>         });
>
> Thanks,
Reply | Threaded
Open this post in threaded view
|

Re: Flink Redis connectivity

Yun Tang
Hi Ramya

Have you ever tried flink-connector-redis<https://github.com/apache/bahir-flink/tree/master/flink-connector-redis> in bahir [1][2]? I think you could use it or obtain some insights.

[1] http://bahir.apache.org/docs/flink/current/flink-streaming-redis/
[2] https://github.com/apache/bahir-flink

Best
Yun Tang

________________________________
From: Yangze Guo <[hidden email]>
Sent: Tuesday, July 21, 2020 18:50
To: dev <[hidden email]>
Subject: Re: Flink Redis connectivity

Hi,

I think you could implement `RichMapFunction` and create `redisClient`
in the `open` method.

Best,
Yangze Guo

On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <[hidden email]> wrote:

>
> Hi,
>
> As per the understanding we have from the documentation, I guess its not
> possible to take the redis connection within the Data Stream. In that case,
> how should i proceed ? How can i access a DB client object within the
> stream ??
>
> I am using Flink 1.7. any help here would be appreciated. Thanks.
>
>   RedisClient redisClient = new
> RedisClient(RedisURI.create("redis://localhost:6379"));
>                 RedisConnection<String, String> client =
> redisClient.connect();
> DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
> String>) value -> {
>
>             String ct = value.getField(5).toString();
>
>             String res = "";
>             if (ct.equals("14") || ct.equals("4")) {
>
>                 res = client.set("key", "val");
>             }
>             return res;
>         });
>
> Thanks,
Reply | Threaded
Open this post in threaded view
|

Re: Flink Redis connectivity

Ramya Ramamurthy
Hi,

Thanks for your response.

I am trying to maintain some state in redis, and for each incoming packet,
I try to map the information on redis, and then finally use ES as a sink to
push the data.
But with this flink-connector-redis, I am not sure if the same can be
achieved. Can you please elaborate on this , so it would be very helpful.

Thank you.


On Wed, Jul 22, 2020 at 9:29 AM Yun Tang <[hidden email]> wrote:

> Hi Ramya
>
> Have you ever tried flink-connector-redis<
> https://github.com/apache/bahir-flink/tree/master/flink-connector-redis>
> in bahir [1][2]? I think you could use it or obtain some insights.
>
> [1] http://bahir.apache.org/docs/flink/current/flink-streaming-redis/
> [2] https://github.com/apache/bahir-flink
>
> Best
> Yun Tang
>
> ________________________________
> From: Yangze Guo <[hidden email]>
> Sent: Tuesday, July 21, 2020 18:50
> To: dev <[hidden email]>
> Subject: Re: Flink Redis connectivity
>
> Hi,
>
> I think you could implement `RichMapFunction` and create `redisClient`
> in the `open` method.
>
> Best,
> Yangze Guo
>
> On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <[hidden email]>
> wrote:
> >
> > Hi,
> >
> > As per the understanding we have from the documentation, I guess its not
> > possible to take the redis connection within the Data Stream. In that
> case,
> > how should i proceed ? How can i access a DB client object within the
> > stream ??
> >
> > I am using Flink 1.7. any help here would be appreciated. Thanks.
> >
> >   RedisClient redisClient = new
> > RedisClient(RedisURI.create("redis://localhost:6379"));
> >                 RedisConnection<String, String> client =
> > redisClient.connect();
> > DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
> > String>) value -> {
> >
> >             String ct = value.getField(5).toString();
> >
> >             String res = "";
> >             if (ct.equals("14") || ct.equals("4")) {
> >
> >                 res = client.set("key", "val");
> >             }
> >             return res;
> >         });
> >
> > Thanks,
>
Reply | Threaded
Open this post in threaded view
|

Re: Flink Redis connectivity

Yangze Guo
Do you need to read the state you maintained from Redis? The
flink-connector-redis only contains sink operator.

Best,
Yangze Guo

On Thu, Jul 23, 2020 at 3:28 PM Ramya Ramamurthy <[hidden email]> wrote:

>
> Hi,
>
> Thanks for your response.
>
> I am trying to maintain some state in redis, and for each incoming packet,
> I try to map the information on redis, and then finally use ES as a sink to
> push the data.
> But with this flink-connector-redis, I am not sure if the same can be
> achieved. Can you please elaborate on this , so it would be very helpful.
>
> Thank you.
>
>
> On Wed, Jul 22, 2020 at 9:29 AM Yun Tang <[hidden email]> wrote:
>
> > Hi Ramya
> >
> > Have you ever tried flink-connector-redis<
> > https://github.com/apache/bahir-flink/tree/master/flink-connector-redis>
> > in bahir [1][2]? I think you could use it or obtain some insights.
> >
> > [1] http://bahir.apache.org/docs/flink/current/flink-streaming-redis/
> > [2] https://github.com/apache/bahir-flink
> >
> > Best
> > Yun Tang
> >
> > ________________________________
> > From: Yangze Guo <[hidden email]>
> > Sent: Tuesday, July 21, 2020 18:50
> > To: dev <[hidden email]>
> > Subject: Re: Flink Redis connectivity
> >
> > Hi,
> >
> > I think you could implement `RichMapFunction` and create `redisClient`
> > in the `open` method.
> >
> > Best,
> > Yangze Guo
> >
> > On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <[hidden email]>
> > wrote:
> > >
> > > Hi,
> > >
> > > As per the understanding we have from the documentation, I guess its not
> > > possible to take the redis connection within the Data Stream. In that
> > case,
> > > how should i proceed ? How can i access a DB client object within the
> > > stream ??
> > >
> > > I am using Flink 1.7. any help here would be appreciated. Thanks.
> > >
> > >   RedisClient redisClient = new
> > > RedisClient(RedisURI.create("redis://localhost:6379"));
> > >                 RedisConnection<String, String> client =
> > > redisClient.connect();
> > > DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
> > > String>) value -> {
> > >
> > >             String ct = value.getField(5).toString();
> > >
> > >             String res = "";
> > >             if (ct.equals("14") || ct.equals("4")) {
> > >
> > >                 res = client.set("key", "val");
> > >             }
> > >             return res;
> > >         });
> > >
> > > Thanks,
> >
Reply | Threaded
Open this post in threaded view
|

答复: Flink Redis connectivity

范超
In reply to this post by Ramya Ramamurthy
Hi Ramya

I just tried to code the example which is worked in 1.10 which I using a custom RichFlatMapFunction to connect ,transform data  and release the conn in its override method.

// app.java
public class RedisMapDemo {
    public static void main(String[] args) throws Exception {
        // 1. source
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setParallelism(1);

        DataStream<String> sourceStream = env.fromElements("test_value");

        // 2. custom map function
        DataStream<String> redisUpdatedStream = sourceStream.flatMap(new RedisFlatMap());

        redisUpdatedStream.print();
        env.execute("testing redis flatmap");
    }
}

// this should be saved as another java file  (RedisFlatMap.java)
public class RedisFlatMap extends RichFlatMapFunction<String, String> {
    String TEST_REDIS_KEY = "my_first_lettuce_key";
    RedisClient redisClient;
    StatefulRedisConnection<String, String> connection;
    RedisCommands<String, String> syncCommands;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        redisClient = RedisClient.create("redis://localhost:6379/0");
        connection = redisClient.connect();
        syncCommands = connection.sync();
    }

    @Override
    public void close() throws Exception {
        super.close();
        // maybe release conn here ?
        connection.close();
        redisClient.shutdown();
    }

    @Override
    public void flatMap(String inputString, Collector<String> out)
        throws Exception {
        // 1. write to redis
        // syncCommands.set(TEST_REDIS_KEY,  " Hello, Redis!");

        // 2. read from redis
        String tmpValue = syncCommands.get(TEST_REDIS_KEY);

        // 3. transform
        out.collect(inputString + " - " + tmpValue);
    }
}
-----邮件原件-----
发件人: Ramya Ramamurthy [mailto:[hidden email]]
发送时间: 2020年7月21日 星期二 18:42
收件人: [hidden email]
主题: Flink Redis connectivity

Hi,

As per the understanding we have from the documentation, I guess its not possible to take the redis connection within the Data Stream. In that case, how should i proceed ? How can i access a DB client object within the stream ??

I am using Flink 1.7. any help here would be appreciated. Thanks.

  RedisClient redisClient = new
RedisClient(RedisURI.create("redis://localhost:6379"));
                RedisConnection<String, String> client = redisClient.connect(); DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row,
String>) value -> {

            String ct = value.getField(5).toString();

            String res = "";
            if (ct.equals("14") || ct.equals("4")) {

                res = client.set("key", "val");
            }
            return res;
        });

Thanks,