Hi,
As per the understanding we have from the documentation, I guess its not possible to take the redis connection within the Data Stream. In that case, how should i proceed ? How can i access a DB client object within the stream ?? I am using Flink 1.7. any help here would be appreciated. Thanks. RedisClient redisClient = new RedisClient(RedisURI.create("redis://localhost:6379")); RedisConnection<String, String> client = redisClient.connect(); DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row, String>) value -> { String ct = value.getField(5).toString(); String res = ""; if (ct.equals("14") || ct.equals("4")) { res = client.set("key", "val"); } return res; }); Thanks, |
Hi,
I think you could implement `RichMapFunction` and create `redisClient` in the `open` method. Best, Yangze Guo On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <[hidden email]> wrote: > > Hi, > > As per the understanding we have from the documentation, I guess its not > possible to take the redis connection within the Data Stream. In that case, > how should i proceed ? How can i access a DB client object within the > stream ?? > > I am using Flink 1.7. any help here would be appreciated. Thanks. > > RedisClient redisClient = new > RedisClient(RedisURI.create("redis://localhost:6379")); > RedisConnection<String, String> client = > redisClient.connect(); > DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row, > String>) value -> { > > String ct = value.getField(5).toString(); > > String res = ""; > if (ct.equals("14") || ct.equals("4")) { > > res = client.set("key", "val"); > } > return res; > }); > > Thanks, |
Hi Ramya
Have you ever tried flink-connector-redis<https://github.com/apache/bahir-flink/tree/master/flink-connector-redis> in bahir [1][2]? I think you could use it or obtain some insights. [1] http://bahir.apache.org/docs/flink/current/flink-streaming-redis/ [2] https://github.com/apache/bahir-flink Best Yun Tang ________________________________ From: Yangze Guo <[hidden email]> Sent: Tuesday, July 21, 2020 18:50 To: dev <[hidden email]> Subject: Re: Flink Redis connectivity Hi, I think you could implement `RichMapFunction` and create `redisClient` in the `open` method. Best, Yangze Guo On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <[hidden email]> wrote: > > Hi, > > As per the understanding we have from the documentation, I guess its not > possible to take the redis connection within the Data Stream. In that case, > how should i proceed ? How can i access a DB client object within the > stream ?? > > I am using Flink 1.7. any help here would be appreciated. Thanks. > > RedisClient redisClient = new > RedisClient(RedisURI.create("redis://localhost:6379")); > RedisConnection<String, String> client = > redisClient.connect(); > DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row, > String>) value -> { > > String ct = value.getField(5).toString(); > > String res = ""; > if (ct.equals("14") || ct.equals("4")) { > > res = client.set("key", "val"); > } > return res; > }); > > Thanks, |
Hi,
Thanks for your response. I am trying to maintain some state in redis, and for each incoming packet, I try to map the information on redis, and then finally use ES as a sink to push the data. But with this flink-connector-redis, I am not sure if the same can be achieved. Can you please elaborate on this , so it would be very helpful. Thank you. On Wed, Jul 22, 2020 at 9:29 AM Yun Tang <[hidden email]> wrote: > Hi Ramya > > Have you ever tried flink-connector-redis< > https://github.com/apache/bahir-flink/tree/master/flink-connector-redis> > in bahir [1][2]? I think you could use it or obtain some insights. > > [1] http://bahir.apache.org/docs/flink/current/flink-streaming-redis/ > [2] https://github.com/apache/bahir-flink > > Best > Yun Tang > > ________________________________ > From: Yangze Guo <[hidden email]> > Sent: Tuesday, July 21, 2020 18:50 > To: dev <[hidden email]> > Subject: Re: Flink Redis connectivity > > Hi, > > I think you could implement `RichMapFunction` and create `redisClient` > in the `open` method. > > Best, > Yangze Guo > > On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <[hidden email]> > wrote: > > > > Hi, > > > > As per the understanding we have from the documentation, I guess its not > > possible to take the redis connection within the Data Stream. In that > case, > > how should i proceed ? How can i access a DB client object within the > > stream ?? > > > > I am using Flink 1.7. any help here would be appreciated. Thanks. > > > > RedisClient redisClient = new > > RedisClient(RedisURI.create("redis://localhost:6379")); > > RedisConnection<String, String> client = > > redisClient.connect(); > > DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row, > > String>) value -> { > > > > String ct = value.getField(5).toString(); > > > > String res = ""; > > if (ct.equals("14") || ct.equals("4")) { > > > > res = client.set("key", "val"); > > } > > return res; > > }); > > > > Thanks, > |
Do you need to read the state you maintained from Redis? The
flink-connector-redis only contains sink operator. Best, Yangze Guo On Thu, Jul 23, 2020 at 3:28 PM Ramya Ramamurthy <[hidden email]> wrote: > > Hi, > > Thanks for your response. > > I am trying to maintain some state in redis, and for each incoming packet, > I try to map the information on redis, and then finally use ES as a sink to > push the data. > But with this flink-connector-redis, I am not sure if the same can be > achieved. Can you please elaborate on this , so it would be very helpful. > > Thank you. > > > On Wed, Jul 22, 2020 at 9:29 AM Yun Tang <[hidden email]> wrote: > > > Hi Ramya > > > > Have you ever tried flink-connector-redis< > > https://github.com/apache/bahir-flink/tree/master/flink-connector-redis> > > in bahir [1][2]? I think you could use it or obtain some insights. > > > > [1] http://bahir.apache.org/docs/flink/current/flink-streaming-redis/ > > [2] https://github.com/apache/bahir-flink > > > > Best > > Yun Tang > > > > ________________________________ > > From: Yangze Guo <[hidden email]> > > Sent: Tuesday, July 21, 2020 18:50 > > To: dev <[hidden email]> > > Subject: Re: Flink Redis connectivity > > > > Hi, > > > > I think you could implement `RichMapFunction` and create `redisClient` > > in the `open` method. > > > > Best, > > Yangze Guo > > > > On Tue, Jul 21, 2020 at 6:43 PM Ramya Ramamurthy <[hidden email]> > > wrote: > > > > > > Hi, > > > > > > As per the understanding we have from the documentation, I guess its not > > > possible to take the redis connection within the Data Stream. In that > > case, > > > how should i proceed ? How can i access a DB client object within the > > > stream ?? > > > > > > I am using Flink 1.7. any help here would be appreciated. Thanks. > > > > > > RedisClient redisClient = new > > > RedisClient(RedisURI.create("redis://localhost:6379")); > > > RedisConnection<String, String> client = > > > redisClient.connect(); > > > DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row, > > > String>) value -> { > > > > > > String ct = value.getField(5).toString(); > > > > > > String res = ""; > > > if (ct.equals("14") || ct.equals("4")) { > > > > > > res = client.set("key", "val"); > > > } > > > return res; > > > }); > > > > > > Thanks, > > |
In reply to this post by Ramya Ramamurthy
Hi Ramya
I just tried to code the example which is worked in 1.10 which I using a custom RichFlatMapFunction to connect ,transform data and release the conn in its override method. // app.java public class RedisMapDemo { public static void main(String[] args) throws Exception { // 1. source final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setParallelism(1); DataStream<String> sourceStream = env.fromElements("test_value"); // 2. custom map function DataStream<String> redisUpdatedStream = sourceStream.flatMap(new RedisFlatMap()); redisUpdatedStream.print(); env.execute("testing redis flatmap"); } } // this should be saved as another java file (RedisFlatMap.java) public class RedisFlatMap extends RichFlatMapFunction<String, String> { String TEST_REDIS_KEY = "my_first_lettuce_key"; RedisClient redisClient; StatefulRedisConnection<String, String> connection; RedisCommands<String, String> syncCommands; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); redisClient = RedisClient.create("redis://localhost:6379/0"); connection = redisClient.connect(); syncCommands = connection.sync(); } @Override public void close() throws Exception { super.close(); // maybe release conn here ? connection.close(); redisClient.shutdown(); } @Override public void flatMap(String inputString, Collector<String> out) throws Exception { // 1. write to redis // syncCommands.set(TEST_REDIS_KEY, " Hello, Redis!"); // 2. read from redis String tmpValue = syncCommands.get(TEST_REDIS_KEY); // 3. transform out.collect(inputString + " - " + tmpValue); } } -----邮件原件----- 发件人: Ramya Ramamurthy [mailto:[hidden email]] 发送时间: 2020年7月21日 星期二 18:42 收件人: [hidden email] 主题: Flink Redis connectivity Hi, As per the understanding we have from the documentation, I guess its not possible to take the redis connection within the Data Stream. In that case, how should i proceed ? How can i access a DB client object within the stream ?? I am using Flink 1.7. any help here would be appreciated. Thanks. RedisClient redisClient = new RedisClient(RedisURI.create("redis://localhost:6379")); RedisConnection<String, String> client = redisClient.connect(); DataStream<String> parsedStream = ipdetailsStream.map((MapFunction<Row, String>) value -> { String ct = value.getField(5).toString(); String res = ""; if (ct.equals("14") || ct.equals("4")) { res = client.set("key", "val"); } return res; }); Thanks, |
Free forum by Nabble | Edit this page |