[DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

classic Classic list List threaded Threaded
18 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

vino yang
Hi all,

I want to share my thought with you about improving the queryable state and introducing a QueryServerProxy component.

I think the current queryable state's client is hard to use. Because it needs users to know the TaskManager's address and proxy's port. Actually, some business users who do not have good knowledge about the Flink's inner or runtime in production. However, sometimes they need to query the values of states.

IMO, the reason caused this problem is because of the queryable state's architecture. Currently, the queryable state clients interact with query state client proxy components which host on each TaskManager. This design is difficult to encapsulate the point of change and exposes too much detail to the user.

My personal idea is that we could introduce a really queryable state server, named e.g. QueryStateProxyServer which would delegate all the query state request and query the local registry then redirect the request to the specific QueryStateClientProxy(runs on each TaskManager). The server is the users really want to care about. And it would make the users ignorant to the TaskManagers' address and proxies' port. The current QueryStateClientProxy would become QueryStateProxyClient

Generally speaking, the roles of the QueryStateProxyServer list below:

  • works as all the query client's proxy to receive all the request and send response;
  • a router to redirect the real query requests to the specific proxy client;
  • maintain route table registry (state <-> TaskManager, TaskManager<->proxy client address)
  • more fine-granted control, such as cache result, ACL, TTL, SLA(rate limit) and so on
About the implementation, there are three opts:

opt 1:

Let the JobManager acts as the query proxy server.
  • pros: reuse the exists JM, do not need to introduce a new process can reduce the complexity;
  • cons: would make JM heavy burdens, depends on the query frequency, may impact on the stability



opt 2:

Introduce a new component  which runs as a single process and acts as the query proxy server:

  • pros: reduce the burdens and make the JM more stability
  • cons: introduced a new component will make the implementation more complexity


opt 3 (suggestion comes from Stefan Richter): 

Combining the two opts, the query server could run as a single entry point(process) and integrate with JobManager. 

If we keep it well encapsulated, the only difference would be how we register new TMs with the query server in the different scenarios, in JM we might have this information already, in standalone e.g. the TMs be started with the query server address to register. This would give the convenience to start QS with the JM and the flexibility for power user to reduce load on their JM.

IMO, the queryable state is a very valuable feature. It can let users query some real-time measure results. I hope it will get the attention of the community.

It is just a roughly thought. If it is valuable to the community, I will give a design draft.

What's your opinion? Any feedback and comment are welcome!

Best,
Vino.

Reply | Threaded
Open this post in threaded view
|

RE: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

Shi Quan
Hi,

How about take states from RocksDB directly, in this case, TM host is unnecessary.

Best

Quan Shi

________________________________
From: vino yang <[hidden email]>
Sent: Thursday, April 25, 2019 10:18:20 PM
To: dev; user
Cc: Stefan Richter; Aljoscha Krettek; [hidden email]
Subject: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

Hi all,

I want to share my thought with you about improving the queryable state and introducing a QueryServerProxy component.

I think the current queryable state's client is hard to use. Because it needs users to know the TaskManager's address and proxy's port. Actually, some business users who do not have good knowledge about the Flink's inner or runtime in production. However, sometimes they need to query the values of states.

IMO, the reason caused this problem is because of the queryable state's architecture. Currently, the queryable state clients interact with query state client proxy components which host on each TaskManager. This design is difficult to encapsulate the point of change and exposes too much detail to the user.

My personal idea is that we could introduce a really queryable state server, named e.g. QueryStateProxyServer which would delegate all the query state request and query the local registry then redirect the request to the specific QueryStateClientProxy(runs on each TaskManager). The server is the users really want to care about. And it would make the users ignorant to the TaskManagers' address and proxies' port. The current QueryStateClientProxy would become QueryStateProxyClient.

Generally speaking, the roles of the QueryStateProxyServer list below:


  *   works as all the query client's proxy to receive all the request and send response;
  *   a router to redirect the real query requests to the specific proxy client;
  *   maintain route table registry (state <-> TaskManager, TaskManager<->proxy client address)
  *   more fine-granted control, such as cache result, ACL, TTL, SLA(rate limit) and so on

About the implementation, there are three opts:

opt 1:

Let the JobManager acts as the query proxy server.

  *   pros: reuse the exists JM, do not need to introduce a new process can reduce the complexity;
  *   cons: would make JM heavy burdens, depends on the query frequency, may impact on the stability

[Screen Shot 2019-04-25 at 5.12.07 PM.png]

opt 2:

Introduce a new component  which runs as a single process and acts as the query proxy server:


  *   pros: reduce the burdens and make the JM more stability
  *   cons: introduced a new component will make the implementation more complexity

[Screen Shot 2019-04-25 at 5.14.05 PM.png]

opt 3 (suggestion comes from Stefan Richter):

Combining the two opts, the query server could run as a single entry point(process) and integrate with JobManager.

If we keep it well encapsulated, the only difference would be how we register new TMs with the query server in the different scenarios, in JM we might have this information already, in standalone e.g. the TMs be started with the query server address to register. This would give the convenience to start QS with the JM and the flexibility for power user to reduce load on their JM.

IMO, the queryable state is a very valuable feature. It can let users query some real-time measure results. I hope it will get the attention of the community.

It is just a roughly thought. If it is valuable to the community, I will give a design draft.

What's your opinion? Any feedback and comment are welcome!

Best,
Vino.

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

vino yang
Hi Quan,

Thanks for your reply.

Actually, I did not try this way.

But, there are two factors we should consider:


   1. The local state storage is not equals to RocksDB, otherwise Flink
   does not need to provide a queryable state client. What's more, querying
   the RocksDB is still an address-explicit action.
   2. IMO, the proposal's more valuable suggestion is to make the queryable
   state's architecture more reasonable, let it encapsulated more details and
   improve its scalability.

Best,
Vino



Shi Quan <[hidden email]> 于2019年4月26日周五 上午10:38写道:

> Hi,
>
> How about take states from RocksDB directly, in this case, TM host is
> unnecessary.
>
> Best
>
> Quan Shi
>
> ________________________________
> From: vino yang <[hidden email]>
> Sent: Thursday, April 25, 2019 10:18:20 PM
> To: dev; user
> Cc: Stefan Richter; Aljoscha Krettek; [hidden email]
> Subject: [DISCUSS] Improve Queryable State and introduce a
> QueryServerProxy component
>
> Hi all,
>
> I want to share my thought with you about improving the queryable state
> and introducing a QueryServerProxy component.
>
> I think the current queryable state's client is hard to use. Because it
> needs users to know the TaskManager's address and proxy's port. Actually,
> some business users who do not have good knowledge about the Flink's inner
> or runtime in production. However, sometimes they need to query the values
> of states.
>
> IMO, the reason caused this problem is because of the queryable state's
> architecture. Currently, the queryable state clients interact with query
> state client proxy components which host on each TaskManager. This design
> is difficult to encapsulate the point of change and exposes too much detail
> to the user.
>
> My personal idea is that we could introduce a really queryable state
> server, named e.g. QueryStateProxyServer which would delegate all the query
> state request and query the local registry then redirect the request to the
> specific QueryStateClientProxy(runs on each TaskManager). The server is the
> users really want to care about. And it would make the users ignorant to
> the TaskManagers' address and proxies' port. The current
> QueryStateClientProxy would become QueryStateProxyClient.
>
> Generally speaking, the roles of the QueryStateProxyServer list below:
>
>
>   *   works as all the query client's proxy to receive all the request and
> send response;
>   *   a router to redirect the real query requests to the specific proxy
> client;
>   *   maintain route table registry (state <-> TaskManager,
> TaskManager<->proxy client address)
>   *   more fine-granted control, such as cache result, ACL, TTL, SLA(rate
> limit) and so on
>
> About the implementation, there are three opts:
>
> opt 1:
>
> Let the JobManager acts as the query proxy server.
>
>   *   pros: reuse the exists JM, do not need to introduce a new process
> can reduce the complexity;
>   *   cons: would make JM heavy burdens, depends on the query frequency,
> may impact on the stability
>
> [Screen Shot 2019-04-25 at 5.12.07 PM.png]
>
> opt 2:
>
> Introduce a new component  which runs as a single process and acts as the
> query proxy server:
>
>
>   *   pros: reduce the burdens and make the JM more stability
>   *   cons: introduced a new component will make the implementation more
> complexity
>
> [Screen Shot 2019-04-25 at 5.14.05 PM.png]
>
> opt 3 (suggestion comes from Stefan Richter):
>
> Combining the two opts, the query server could run as a single entry
> point(process) and integrate with JobManager.
>
> If we keep it well encapsulated, the only difference would be how we
> register new TMs with the query server in the different scenarios, in JM we
> might have this information already, in standalone e.g. the TMs be started
> with the query server address to register. This would give the convenience
> to start QS with the JM and the flexibility for power user to reduce load
> on their JM.
>
> IMO, the queryable state is a very valuable feature. It can let users
> query some real-time measure results. I hope it will get the attention of
> the community.
>
> It is just a roughly thought. If it is valuable to the community, I will
> give a design draft.
>
> What's your opinion? Any feedback and comment are welcome!
>
> Best,
> Vino.
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

PaulLam
Hi Vino,

Thanks a lot for bringing up the discussion! Queryable state has been at beta version for a long time, and due to its complexity and instability I think there are not many users, but there’s a great value in it which makes state as database one step closer.

WRT the architecture, I’d vote for opt 3, because it fits the cloud architecture the most and avoids putting more burdens on JM (sometimes the queries could be slow and resources intensive). My concern is that on many cluster frameworks the container resources are limited (IIUC, the JM and QS are running in the same container), would JM gets killed if QS eats up too much memory?

And a minor suggestion: can we introduce a cmd script to setup a QueryableStateClient? That would be easier for users who wants to try out this feature.

Best,
Paul Lam

> 在 2019年4月26日,11:09,vino yang <[hidden email]> 写道:
>
> Hi Quan,
>
> Thanks for your reply.
>
> Actually, I did not try this way.
>
> But, there are two factors we should consider:
>
>
>   1. The local state storage is not equals to RocksDB, otherwise Flink
>   does not need to provide a queryable state client. What's more, querying
>   the RocksDB is still an address-explicit action.
>   2. IMO, the proposal's more valuable suggestion is to make the queryable
>   state's architecture more reasonable, let it encapsulated more details and
>   improve its scalability.
>
> Best,
> Vino
>
>
>
> Shi Quan <[hidden email]> 于2019年4月26日周五 上午10:38写道:
>
>> Hi,
>>
>> How about take states from RocksDB directly, in this case, TM host is
>> unnecessary.
>>
>> Best
>>
>> Quan Shi
>>
>> ________________________________
>> From: vino yang <[hidden email]>
>> Sent: Thursday, April 25, 2019 10:18:20 PM
>> To: dev; user
>> Cc: Stefan Richter; Aljoscha Krettek; [hidden email]
>> Subject: [DISCUSS] Improve Queryable State and introduce a
>> QueryServerProxy component
>>
>> Hi all,
>>
>> I want to share my thought with you about improving the queryable state
>> and introducing a QueryServerProxy component.
>>
>> I think the current queryable state's client is hard to use. Because it
>> needs users to know the TaskManager's address and proxy's port. Actually,
>> some business users who do not have good knowledge about the Flink's inner
>> or runtime in production. However, sometimes they need to query the values
>> of states.
>>
>> IMO, the reason caused this problem is because of the queryable state's
>> architecture. Currently, the queryable state clients interact with query
>> state client proxy components which host on each TaskManager. This design
>> is difficult to encapsulate the point of change and exposes too much detail
>> to the user.
>>
>> My personal idea is that we could introduce a really queryable state
>> server, named e.g. QueryStateProxyServer which would delegate all the query
>> state request and query the local registry then redirect the request to the
>> specific QueryStateClientProxy(runs on each TaskManager). The server is the
>> users really want to care about. And it would make the users ignorant to
>> the TaskManagers' address and proxies' port. The current
>> QueryStateClientProxy would become QueryStateProxyClient.
>>
>> Generally speaking, the roles of the QueryStateProxyServer list below:
>>
>>
>>  *   works as all the query client's proxy to receive all the request and
>> send response;
>>  *   a router to redirect the real query requests to the specific proxy
>> client;
>>  *   maintain route table registry (state <-> TaskManager,
>> TaskManager<->proxy client address)
>>  *   more fine-granted control, such as cache result, ACL, TTL, SLA(rate
>> limit) and so on
>>
>> About the implementation, there are three opts:
>>
>> opt 1:
>>
>> Let the JobManager acts as the query proxy server.
>>
>>  *   pros: reuse the exists JM, do not need to introduce a new process
>> can reduce the complexity;
>>  *   cons: would make JM heavy burdens, depends on the query frequency,
>> may impact on the stability
>>
>> [Screen Shot 2019-04-25 at 5.12.07 PM.png]
>>
>> opt 2:
>>
>> Introduce a new component  which runs as a single process and acts as the
>> query proxy server:
>>
>>
>>  *   pros: reduce the burdens and make the JM more stability
>>  *   cons: introduced a new component will make the implementation more
>> complexity
>>
>> [Screen Shot 2019-04-25 at 5.14.05 PM.png]
>>
>> opt 3 (suggestion comes from Stefan Richter):
>>
>> Combining the two opts, the query server could run as a single entry
>> point(process) and integrate with JobManager.
>>
>> If we keep it well encapsulated, the only difference would be how we
>> register new TMs with the query server in the different scenarios, in JM we
>> might have this information already, in standalone e.g. the TMs be started
>> with the query server address to register. This would give the convenience
>> to start QS with the JM and the flexibility for power user to reduce load
>> on their JM.
>>
>> IMO, the queryable state is a very valuable feature. It can let users
>> query some real-time measure results. I hope it will get the attention of
>> the community.
>>
>> It is just a roughly thought. If it is valuable to the community, I will
>> give a design draft.
>>
>> What's your opinion? Any feedback and comment are welcome!
>>
>> Best,
>> Vino.
>>
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

vino yang
Hi Paul,

Thanks for your reply.

You are right, currently, the queryable state has few users. And I totally
agree with you, it makes the streaming works more like a DB.

About the architecture and the problem you concern: yes, it maybe
affect the JobManager if they are deployed together.
I think it's important to guarantee the JobManager's available and
stability, and the QueryProxyServer is just a secondary service component.
So when describing the role of the QueryProxyServer, I mentioned SLA
policy, I think it's a solution. But the detail may need to be discussed.

About starting queryable state client with a cmd, I think it's a good idea
and valuable.

Best,
Vino.

Paul Lam <[hidden email]> 于2019年4月26日周五 下午3:31写道:

> Hi Vino,
>
> Thanks a lot for bringing up the discussion! Queryable state has been at
> beta version for a long time, and due to its complexity and instability I
> think there are not many users, but there’s a great value in it which makes
> state as database one step closer.
>
> WRT the architecture, I’d vote for opt 3, because it fits the cloud
> architecture the most and avoids putting more burdens on JM (sometimes the
> queries could be slow and resources intensive). My concern is that on many
> cluster frameworks the container resources are limited (IIUC, the JM and QS
> are running in the same container), would JM gets killed if QS eats up too
> much memory?
>
> And a minor suggestion: can we introduce a cmd script to setup a
> QueryableStateClient? That would be easier for users who wants to try out
> this feature.
>
> Best,
> Paul Lam
>
> > 在 2019年4月26日,11:09,vino yang <[hidden email]> 写道:
> >
> > Hi Quan,
> >
> > Thanks for your reply.
> >
> > Actually, I did not try this way.
> >
> > But, there are two factors we should consider:
> >
> >
> >   1. The local state storage is not equals to RocksDB, otherwise Flink
> >   does not need to provide a queryable state client. What's more,
> querying
> >   the RocksDB is still an address-explicit action.
> >   2. IMO, the proposal's more valuable suggestion is to make the
> queryable
> >   state's architecture more reasonable, let it encapsulated more details
> and
> >   improve its scalability.
> >
> > Best,
> > Vino
> >
> >
> >
> > Shi Quan <[hidden email]> 于2019年4月26日周五 上午10:38写道:
> >
> >> Hi,
> >>
> >> How about take states from RocksDB directly, in this case, TM host is
> >> unnecessary.
> >>
> >> Best
> >>
> >> Quan Shi
> >>
> >> ________________________________
> >> From: vino yang <[hidden email]>
> >> Sent: Thursday, April 25, 2019 10:18:20 PM
> >> To: dev; user
> >> Cc: Stefan Richter; Aljoscha Krettek; [hidden email]
> >> Subject: [DISCUSS] Improve Queryable State and introduce a
> >> QueryServerProxy component
> >>
> >> Hi all,
> >>
> >> I want to share my thought with you about improving the queryable state
> >> and introducing a QueryServerProxy component.
> >>
> >> I think the current queryable state's client is hard to use. Because it
> >> needs users to know the TaskManager's address and proxy's port.
> Actually,
> >> some business users who do not have good knowledge about the Flink's
> inner
> >> or runtime in production. However, sometimes they need to query the
> values
> >> of states.
> >>
> >> IMO, the reason caused this problem is because of the queryable state's
> >> architecture. Currently, the queryable state clients interact with query
> >> state client proxy components which host on each TaskManager. This
> design
> >> is difficult to encapsulate the point of change and exposes too much
> detail
> >> to the user.
> >>
> >> My personal idea is that we could introduce a really queryable state
> >> server, named e.g. QueryStateProxyServer which would delegate all the
> query
> >> state request and query the local registry then redirect the request to
> the
> >> specific QueryStateClientProxy(runs on each TaskManager). The server is
> the
> >> users really want to care about. And it would make the users ignorant to
> >> the TaskManagers' address and proxies' port. The current
> >> QueryStateClientProxy would become QueryStateProxyClient.
> >>
> >> Generally speaking, the roles of the QueryStateProxyServer list below:
> >>
> >>
> >>  *   works as all the query client's proxy to receive all the request
> and
> >> send response;
> >>  *   a router to redirect the real query requests to the specific proxy
> >> client;
> >>  *   maintain route table registry (state <-> TaskManager,
> >> TaskManager<->proxy client address)
> >>  *   more fine-granted control, such as cache result, ACL, TTL, SLA(rate
> >> limit) and so on
> >>
> >> About the implementation, there are three opts:
> >>
> >> opt 1:
> >>
> >> Let the JobManager acts as the query proxy server.
> >>
> >>  *   pros: reuse the exists JM, do not need to introduce a new process
> >> can reduce the complexity;
> >>  *   cons: would make JM heavy burdens, depends on the query frequency,
> >> may impact on the stability
> >>
> >> [Screen Shot 2019-04-25 at 5.12.07 PM.png]
> >>
> >> opt 2:
> >>
> >> Introduce a new component  which runs as a single process and acts as
> the
> >> query proxy server:
> >>
> >>
> >>  *   pros: reduce the burdens and make the JM more stability
> >>  *   cons: introduced a new component will make the implementation more
> >> complexity
> >>
> >> [Screen Shot 2019-04-25 at 5.14.05 PM.png]
> >>
> >> opt 3 (suggestion comes from Stefan Richter):
> >>
> >> Combining the two opts, the query server could run as a single entry
> >> point(process) and integrate with JobManager.
> >>
> >> If we keep it well encapsulated, the only difference would be how we
> >> register new TMs with the query server in the different scenarios, in
> JM we
> >> might have this information already, in standalone e.g. the TMs be
> started
> >> with the query server address to register. This would give the
> convenience
> >> to start QS with the JM and the flexibility for power user to reduce
> load
> >> on their JM.
> >>
> >> IMO, the queryable state is a very valuable feature. It can let users
> >> query some real-time measure results. I hope it will get the attention
> of
> >> the community.
> >>
> >> It is just a roughly thought. If it is valuable to the community, I will
> >> give a design draft.
> >>
> >> What's your opinion? Any feedback and comment are welcome!
> >>
> >> Best,
> >> Vino.
> >>
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

Elias Levy
On Fri, Apr 26, 2019 at 1:41 AM vino yang <[hidden email]> wrote:

> You are right, currently, the queryable state has few users. And I totally
> agree with you, it makes the streaming works more like a DB.
>

Alas, I don't think queryable state will really be used much in production
other than for ad hoc queries or debugging.  Real data stores at scale are
resilient, replicated, and with very low downtime.  In my opinion, Flink
jobs don't sufficiently meet these requirements to work as a replacement
for a data store.  Jobs too frequently fail and restart because of
checkpoint failures, particularly ones with large state.  And when a job
does restart, all too often local restore can't be used (e.g. if you loose
a node).  And since there is no fine grained job recovery and there is no
hot replicas of the data, all the state will need to be restored from the
DFS, which for something with large state can take a while.  It's a nice
idea, just not realistic in practice.
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

vino yang
Hi Elias,

I agree with your opinion that "*Flink jobs don't sufficiently meet these
requirements to work as a replacement for a data store.*".  Actually, I
think it's obviously not Flink's goal. If we think that the database
contains the main two parts(inexactitude): data query and data store. What
I and Paul mean is the former.

Yes, you have mentioned it's major value: ad hoc and debugging(IMO,
especially for the former). To give a real-time calculation result is very
import for some scene(such as real-time measure for real-time OLAP) in a
long-term (no-window or large window).

So, my opinion: Queryable state is not dedicated to replacing data stores.
However, if we could query state more conveniently, it makes the streaming
works more like DB in query aspect.

Best,
Vino.

Elias Levy <[hidden email]> 于2019年4月27日周六 上午1:30写道:

> On Fri, Apr 26, 2019 at 1:41 AM vino yang <[hidden email]> wrote:
>
> > You are right, currently, the queryable state has few users. And I
> totally
> > agree with you, it makes the streaming works more like a DB.
> >
>
> Alas, I don't think queryable state will really be used much in production
> other than for ad hoc queries or debugging.  Real data stores at scale are
> resilient, replicated, and with very low downtime.  In my opinion, Flink
> jobs don't sufficiently meet these requirements to work as a replacement
> for a data store.  Jobs too frequently fail and restart because of
> checkpoint failures, particularly ones with large state.  And when a job
> does restart, all too often local restore can't be used (e.g. if you loose
> a node).  And since there is no fine grained job recovery and there is no
> hot replicas of the data, all the state will need to be restored from the
> DFS, which for something with large state can take a while.  It's a nice
> idea, just not realistic in practice.
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

liyu
Glad to see discussions around QueryableState in mailing list, and it seems
we have included a bigger scope in the discussion, that what's the data
model in Flink and how to (or is it possible to) use Flink as a database. I
suggest to open another thread for this bigger topic and personally I think
the first question should be answered is what's the relationship between
Flink ledger and QueryableState.

Back to the user scenario itself, I'd like to post two open questions about
QueryableState for ad-hoc query:
1. Currently the isolation level of QueryableState is *Read Uncommitted*
since failover might happen and cause data rollback. Although the
"uncommitted" data will be replayed again and get final consistency,
application will see unstable query result. Probably some kind of
applications could bare such drawback but what exactly?

2. Currently in Flink sink is more commonly regarded as the "result
partition" and state of operators in the pipeline more like "intermediate
data". Used for debugging purpose is easy to understand but not for ad-hoc
query. Or in another word, what makes user prefer querying the state data
instead of sink? Or why we need to query the intermediate data instead of
the result?

Further back to the original topic proposed in this thread about
introducing a QueryableStateProxy, I could see some careful consideration
on query load on the proxy. However, under heavy load the pressure is not
only on query serving but also on meta requesting, which is handled by JM
for now. So to release JM pressure, we should also extract the meta serving
task out, and my suggestion is to introduce a new component like
*StateMetaServer* and take over both query and meta serving
responsibilities.

Best Regards,
Yu


On Sat, 27 Apr 2019 at 11:58, vino yang <[hidden email]> wrote:

> Hi Elias,
>
> I agree with your opinion that "*Flink jobs don't sufficiently meet these
> requirements to work as a replacement for a data store.*".  Actually, I
> think it's obviously not Flink's goal. If we think that the database
> contains the main two parts(inexactitude): data query and data store. What
> I and Paul mean is the former.
>
> Yes, you have mentioned it's major value: ad hoc and debugging(IMO,
> especially for the former). To give a real-time calculation result is very
> import for some scene(such as real-time measure for real-time OLAP) in a
> long-term (no-window or large window).
>
> So, my opinion: Queryable state is not dedicated to replacing data stores.
> However, if we could query state more conveniently, it makes the streaming
> works more like DB in query aspect.
>
> Best,
> Vino.
>
> Elias Levy <[hidden email]> 于2019年4月27日周六 上午1:30写道:
>
> > On Fri, Apr 26, 2019 at 1:41 AM vino yang <[hidden email]> wrote:
> >
> > > You are right, currently, the queryable state has few users. And I
> > totally
> > > agree with you, it makes the streaming works more like a DB.
> > >
> >
> > Alas, I don't think queryable state will really be used much in
> production
> > other than for ad hoc queries or debugging.  Real data stores at scale
> are
> > resilient, replicated, and with very low downtime.  In my opinion, Flink
> > jobs don't sufficiently meet these requirements to work as a replacement
> > for a data store.  Jobs too frequently fail and restart because of
> > checkpoint failures, particularly ones with large state.  And when a job
> > does restart, all too often local restore can't be used (e.g. if you
> loose
> > a node).  And since there is no fine grained job recovery and there is no
> > hot replicas of the data, all the state will need to be restored from the
> > DFS, which for something with large state can take a while.  It's a nice
> > idea, just not realistic in practice.
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

vino yang
Hi yu,

Thanks for your reply. I have some inline comment.

Yu Li <[hidden email]> 于2019年4月28日周日 下午12:24写道:

> Glad to see discussions around QueryableState in mailing list, and it seems
> we have included a bigger scope in the discussion, that what's the data
> model in Flink and how to (or is it possible to) use Flink as a database. I
> suggest to open another thread for this bigger topic and personally I think
> the first question should be answered is what's the relationship between
> Flink ledger and QueryableState.
>

*About the scope, yes, it seems it's big. Actually, I think the questions
you provided make it bigger than I have done.*
*Here I think we don't need to answer the two questions(we can discuss in
another thread, or answer it later).*

*My original thought is that we found the queryable state is hard to use
and it may cause few users to use this function. We may think the reason
and the result affect each other. And IMO, currently, the queryable state's
architecture caused this problem. So I opened a thread to see how to
improve them. *

*We mentioned these keywords e.g. "state、database" is to emphasize the
queryable state is very important. The data model and use Flink as a
database is not this thread's main topic (as Elias's reply said, many
issues cause the road to this goal is so long). This thread I assume we do
not change the state's core design and the goal is to bring a better query
solution.*

*About the relationship between ledger and Queryable State, I also think it
is out of this thread.*


>
> Back to the user scenario itself, I'd like to post two open questions about
> QueryableState for ad-hoc query:
> 1. Currently the isolation level of QueryableState is *Read Uncommitted*
> since failover might happen and cause data rollback. Although the
> "uncommitted" data will be replayed again and get final consistency,
> application will see unstable query result. Probably some kind of
> applications could bare such drawback but what exactly?
>

*Yes, the QueryableState's isolation level is *Read Uncommitted*. I think
if we need a higher isolation level, may need other mechanisms to guarantee
this. I am sorry, I can not give the solution.*
*However, I think it would not affect we discuss how to improve the
queryable state's architecture, right?*


>
> 2. Currently in Flink sink is more commonly regarded as the "result
> partition" and state of operators in the pipeline more like "intermediate
> data". Used for debugging purpose is easy to understand but not for ad-hoc
> query. Or in another word, what makes user prefer querying the state data
> instead of sink? Or why we need to query the intermediate data instead of
> the result?
>
>
*About the opinion that state of operators in the pipeline more like
"intermediate data". Yes, you are right. It's intermediate data, and we
need it in some scene.*
*The valuable is that it represents "real-time". When querying a state, we
need its current value, we can not wait for sink. The intermediate data is
also valuable, for example, we just need a partitioned data stream's
real-time measure value.*


> Further back to the original topic proposed in this thread about
> introducing a QueryableStateProxy, I could see some careful consideration
> on query load on the proxy. However, under heavy load the pressure is not
> only on query serving but also on meta requesting, which is handled by JM
> for now. So to release JM pressure, we should also extract the meta serving
> task out, and my suggestion is to introduce a new component like
> *StateMetaServer* and take over both query and meta serving
> responsibilities.
>

*I think the opinion of metadata's pressure and *StateMetaServer* are good.
We need to care about them when we design.*
*I mentioned the meta info(registry) in the two option's simple
architecture picture. Although, I just emphasized the query proxy server,
because it is the main component.*

*Your worry is reasonable. The proxy server's architecture is good for
processing this, such as the mechanisms of request flow control, pressure
transfer to a single entry point(for opt2 and opt3, we can serve meta-query
in a single process).*

*Anyway, it just opened a discussion to listen to the community's opinion.*


>
> Best Regards,
> Yu
>
>
> On Sat, 27 Apr 2019 at 11:58, vino yang <[hidden email]> wrote:
>
> > Hi Elias,
> >
> > I agree with your opinion that "*Flink jobs don't sufficiently meet these
> > requirements to work as a replacement for a data store.*".  Actually, I
> > think it's obviously not Flink's goal. If we think that the database
> > contains the main two parts(inexactitude): data query and data store.
> What
> > I and Paul mean is the former.
> >
> > Yes, you have mentioned it's major value: ad hoc and debugging(IMO,
> > especially for the former). To give a real-time calculation result is
> very
> > import for some scene(such as real-time measure for real-time OLAP) in a
> > long-term (no-window or large window).
> >
> > So, my opinion: Queryable state is not dedicated to replacing data
> stores.
> > However, if we could query state more conveniently, it makes the
> streaming
> > works more like DB in query aspect.
> >
> > Best,
> > Vino.
> >
> > Elias Levy <[hidden email]> 于2019年4月27日周六 上午1:30写道:
> >
> > > On Fri, Apr 26, 2019 at 1:41 AM vino yang <[hidden email]>
> wrote:
> > >
> > > > You are right, currently, the queryable state has few users. And I
> > > totally
> > > > agree with you, it makes the streaming works more like a DB.
> > > >
> > >
> > > Alas, I don't think queryable state will really be used much in
> > production
> > > other than for ad hoc queries or debugging.  Real data stores at scale
> > are
> > > resilient, replicated, and with very low downtime.  In my opinion,
> Flink
> > > jobs don't sufficiently meet these requirements to work as a
> replacement
> > > for a data store.  Jobs too frequently fail and restart because of
> > > checkpoint failures, particularly ones with large state.  And when a
> job
> > > does restart, all too often local restore can't be used (e.g. if you
> > loose
> > > a node).  And since there is no fine grained job recovery and there is
> no
> > > hot replicas of the data, all the state will need to be restored from
> the
> > > DFS, which for something with large state can take a while.  It's a
> nice
> > > idea, just not realistic in practice.
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

liyu
TL;DR: IMO a more complete solution is to cover both query and meta request
serving in a new component. We could use the proposal here as step one but
we should have a global plan. And before improving a seemingly not widely
used feature, we'd better weigh the gain and efforts.

Let me clarify the purpose of my previous questions, that before we start
detailed design and code development, it's better to get consensus on:
1. What's the value of the work?
    - As noticed, the queryable state feature has been implemented for some
while but not widely used in production (AFAIK), why? If it did been used
in critical scenarios, what those scenarios are?
    - I think it's a good time discussing about this (since raised in this
thread by others) and confirm the value of efforts improving this feature.
2. Would there be duplicated work?
    - This is the main reason I asked about the relationship between ledger
and queryable-state.

And some answers to the inline comments:

bq. About the relationship between ledger and Queryable State, I also think
it is out of this thread
True, that's why I suggested to open another thread. But as mentioned
above, the question is relative if we think about the whole.

bq. Yes, the QueryableState's isolation level is *Read Uncommitted*...
However, I think it would not affect we discuss how to improve the
queryable state's architecture, right?
Correct, but my real question here is what kind of application could bear
the changing query result.

bq. The intermediate data is also valuable, for example, we just need a
partitioned data stream's real-time measure value.
In this case there must be some complicated operation in the pipeline which
causes long latency at sink? Could you talk more about the real-world case?
Thanks.

bq. Your worry is reasonable.
Then I suggest to think it as a whole. We could split the implementation
into steps, but better to have a global plan, to make it really applicable
in production (under heavy load).

Best Regards,
Yu


On Sun, 28 Apr 2019 at 14:48, vino yang <[hidden email]> wrote:

> Hi yu,
>
> Thanks for your reply. I have some inline comment.
>
> Yu Li <[hidden email]> 于2019年4月28日周日 下午12:24写道:
>
> > Glad to see discussions around QueryableState in mailing list, and it
> seems
> > we have included a bigger scope in the discussion, that what's the data
> > model in Flink and how to (or is it possible to) use Flink as a
> database. I
> > suggest to open another thread for this bigger topic and personally I
> think
> > the first question should be answered is what's the relationship between
> > Flink ledger and QueryableState.
> >
>
> *About the scope, yes, it seems it's big. Actually, I think the questions
> you provided make it bigger than I have done.*
> *Here I think we don't need to answer the two questions(we can discuss in
> another thread, or answer it later).*
>
> *My original thought is that we found the queryable state is hard to use
> and it may cause few users to use this function. We may think the reason
> and the result affect each other. And IMO, currently, the queryable state's
> architecture caused this problem. So I opened a thread to see how to
> improve them. *
>
> *We mentioned these keywords e.g. "state、database" is to emphasize the
> queryable state is very important. The data model and use Flink as a
> database is not this thread's main topic (as Elias's reply said, many
> issues cause the road to this goal is so long). This thread I assume we do
> not change the state's core design and the goal is to bring a better query
> solution.*
>
> *About the relationship between ledger and Queryable State, I also think it
> is out of this thread.*
>
>
> >
> > Back to the user scenario itself, I'd like to post two open questions
> about
> > QueryableState for ad-hoc query:
> > 1. Currently the isolation level of QueryableState is *Read Uncommitted*
> > since failover might happen and cause data rollback. Although the
> > "uncommitted" data will be replayed again and get final consistency,
> > application will see unstable query result. Probably some kind of
> > applications could bare such drawback but what exactly?
> >
>
> *Yes, the QueryableState's isolation level is *Read Uncommitted*. I think
> if we need a higher isolation level, may need other mechanisms to guarantee
> this. I am sorry, I can not give the solution.*
> *However, I think it would not affect we discuss how to improve the
> queryable state's architecture, right?*
>
>
> >
> > 2. Currently in Flink sink is more commonly regarded as the "result
> > partition" and state of operators in the pipeline more like "intermediate
> > data". Used for debugging purpose is easy to understand but not for
> ad-hoc
> > query. Or in another word, what makes user prefer querying the state data
> > instead of sink? Or why we need to query the intermediate data instead of
> > the result?
> >
> >
> *About the opinion that state of operators in the pipeline more like
> "intermediate data". Yes, you are right. It's intermediate data, and we
> need it in some scene.*
> *The valuable is that it represents "real-time". When querying a state, we
> need its current value, we can not wait for sink. The intermediate data is
> also valuable, for example, we just need a partitioned data stream's
> real-time measure value.*
>
>
> > Further back to the original topic proposed in this thread about
> > introducing a QueryableStateProxy, I could see some careful consideration
> > on query load on the proxy. However, under heavy load the pressure is not
> > only on query serving but also on meta requesting, which is handled by JM
> > for now. So to release JM pressure, we should also extract the meta
> serving
> > task out, and my suggestion is to introduce a new component like
> > *StateMetaServer* and take over both query and meta serving
> > responsibilities.
> >
>
> *I think the opinion of metadata's pressure and *StateMetaServer* are good.
> We need to care about them when we design.*
> *I mentioned the meta info(registry) in the two option's simple
> architecture picture. Although, I just emphasized the query proxy server,
> because it is the main component.*
>
> *Your worry is reasonable. The proxy server's architecture is good for
> processing this, such as the mechanisms of request flow control, pressure
> transfer to a single entry point(for opt2 and opt3, we can serve meta-query
> in a single process).*
>
> *Anyway, it just opened a discussion to listen to the community's opinion.*
>
>
> >
> > Best Regards,
> > Yu
> >
> >
> > On Sat, 27 Apr 2019 at 11:58, vino yang <[hidden email]> wrote:
> >
> > > Hi Elias,
> > >
> > > I agree with your opinion that "*Flink jobs don't sufficiently meet
> these
> > > requirements to work as a replacement for a data store.*".  Actually, I
> > > think it's obviously not Flink's goal. If we think that the database
> > > contains the main two parts(inexactitude): data query and data store.
> > What
> > > I and Paul mean is the former.
> > >
> > > Yes, you have mentioned it's major value: ad hoc and debugging(IMO,
> > > especially for the former). To give a real-time calculation result is
> > very
> > > import for some scene(such as real-time measure for real-time OLAP) in
> a
> > > long-term (no-window or large window).
> > >
> > > So, my opinion: Queryable state is not dedicated to replacing data
> > stores.
> > > However, if we could query state more conveniently, it makes the
> > streaming
> > > works more like DB in query aspect.
> > >
> > > Best,
> > > Vino.
> > >
> > > Elias Levy <[hidden email]> 于2019年4月27日周六 上午1:30写道:
> > >
> > > > On Fri, Apr 26, 2019 at 1:41 AM vino yang <[hidden email]>
> > wrote:
> > > >
> > > > > You are right, currently, the queryable state has few users. And I
> > > > totally
> > > > > agree with you, it makes the streaming works more like a DB.
> > > > >
> > > >
> > > > Alas, I don't think queryable state will really be used much in
> > > production
> > > > other than for ad hoc queries or debugging.  Real data stores at
> scale
> > > are
> > > > resilient, replicated, and with very low downtime.  In my opinion,
> > Flink
> > > > jobs don't sufficiently meet these requirements to work as a
> > replacement
> > > > for a data store.  Jobs too frequently fail and restart because of
> > > > checkpoint failures, particularly ones with large state.  And when a
> > job
> > > > does restart, all too often local restore can't be used (e.g. if you
> > > loose
> > > > a node).  And since there is no fine grained job recovery and there
> is
> > no
> > > > hot replicas of the data, all the state will need to be restored from
> > the
> > > > DFS, which for something with large state can take a while.  It's a
> > nice
> > > > idea, just not realistic in practice.
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

vino yang
Hi Yu,

OK, now I know your comments more clearly.

Now, answer your two questions:

1. the value of this work:

As I mentioned in the last reply mail to you: "we found the queryable state
is hard to use and it may cause few users to use this function. We may
think the reason and the result affect each other. And IMO, currently, the
queryable state's architecture caused this problem. So I opened a thread to
see how to improve them." We try to improve this issue, to break the cycle
of the reason and the result.

About the queryable state, its value, I think it does not need to clarify,
and the previous reply mail from others has verified it.

We did not use this feature in critical scenarios, but there are many
common scenarios suit this feature, e.g. :


   - calculations' period is very long, but need the more fine-grained
   real-time result, for example, get current measure value for real-time
   OLAP, get consume offset of message system and so on;
   - Debugging application
   - ....

If the queryable state has a better use experience, IMO, more and more
users would use this feature.

2. about duplicated work, I do not know. For now, the ledger project has
not been joined into Flink's repository. But I can ping @Stephan, he maybe
wants to answer this question.

About a whole and global plan and view, I totally agree with you. I did not
give more thought and details, I have replied to you about the reason:
because I did not know the community's opinion and if it can be added in
Flink's roadmap.

All right, we can discuss more details. IMO, a more completed solution may
contain these :


   - refactor query client's API, with meta-service, we may provide more
   useful API, e.g. scan all keys or scan a key range and so on, obviously,
   the client API need to adjust to provide new information for query;
   - introduce a query proxy server, which contains request router,
   metadata manage/sync, ACL, SLA, and more plugin(I think a plugin
   architecture is a good choice) or sub-component;
      - interact with JobManager
      - interact with TaskManager
      - plugin's loading strategy
   - refactor the real querier runs on each TaskManager, it needs to
   interact with the query proxy server;

Obviously, each step can also be split into several steps.

Hope for your suggestion and guidance. Any questions, pls let me know.

Best,
Vino


Yu Li <[hidden email]> 于2019年4月28日周日 下午3:40写道:

> TL;DR: IMO a more complete solution is to cover both query and meta request
> serving in a new component. We could use the proposal here as step one but
> we should have a global plan. And before improving a seemingly not widely
> used feature, we'd better weigh the gain and efforts.
>
> Let me clarify the purpose of my previous questions, that before we start
> detailed design and code development, it's better to get consensus on:
> 1. What's the value of the work?
>     - As noticed, the queryable state feature has been implemented for some
> while but not widely used in production (AFAIK), why? If it did been used
> in critical scenarios, what those scenarios are?
>     - I think it's a good time discussing about this (since raised in this
> thread by others) and confirm the value of efforts improving this feature.
> 2. Would there be duplicated work?
>     - This is the main reason I asked about the relationship between ledger
> and queryable-state.
>
> And some answers to the inline comments:
>
> bq. About the relationship between ledger and Queryable State, I also think
> it is out of this thread
> True, that's why I suggested to open another thread. But as mentioned
> above, the question is relative if we think about the whole.
>
> bq. Yes, the QueryableState's isolation level is *Read Uncommitted*...
> However, I think it would not affect we discuss how to improve the
> queryable state's architecture, right?
> Correct, but my real question here is what kind of application could bear
> the changing query result.
>
> bq. The intermediate data is also valuable, for example, we just need a
> partitioned data stream's real-time measure value.
> In this case there must be some complicated operation in the pipeline which
> causes long latency at sink? Could you talk more about the real-world case?
> Thanks.
>
> bq. Your worry is reasonable.
> Then I suggest to think it as a whole. We could split the implementation
> into steps, but better to have a global plan, to make it really applicable
> in production (under heavy load).
>
> Best Regards,
> Yu
>
>
> On Sun, 28 Apr 2019 at 14:48, vino yang <[hidden email]> wrote:
>
> > Hi yu,
> >
> > Thanks for your reply. I have some inline comment.
> >
> > Yu Li <[hidden email]> 于2019年4月28日周日 下午12:24写道:
> >
> > > Glad to see discussions around QueryableState in mailing list, and it
> > seems
> > > we have included a bigger scope in the discussion, that what's the data
> > > model in Flink and how to (or is it possible to) use Flink as a
> > database. I
> > > suggest to open another thread for this bigger topic and personally I
> > think
> > > the first question should be answered is what's the relationship
> between
> > > Flink ledger and QueryableState.
> > >
> >
> > *About the scope, yes, it seems it's big. Actually, I think the questions
> > you provided make it bigger than I have done.*
> > *Here I think we don't need to answer the two questions(we can discuss in
> > another thread, or answer it later).*
> >
> > *My original thought is that we found the queryable state is hard to use
> > and it may cause few users to use this function. We may think the reason
> > and the result affect each other. And IMO, currently, the queryable
> state's
> > architecture caused this problem. So I opened a thread to see how to
> > improve them. *
> >
> > *We mentioned these keywords e.g. "state、database" is to emphasize the
> > queryable state is very important. The data model and use Flink as a
> > database is not this thread's main topic (as Elias's reply said, many
> > issues cause the road to this goal is so long). This thread I assume we
> do
> > not change the state's core design and the goal is to bring a better
> query
> > solution.*
> >
> > *About the relationship between ledger and Queryable State, I also think
> it
> > is out of this thread.*
> >
> >
> > >
> > > Back to the user scenario itself, I'd like to post two open questions
> > about
> > > QueryableState for ad-hoc query:
> > > 1. Currently the isolation level of QueryableState is *Read
> Uncommitted*
> > > since failover might happen and cause data rollback. Although the
> > > "uncommitted" data will be replayed again and get final consistency,
> > > application will see unstable query result. Probably some kind of
> > > applications could bare such drawback but what exactly?
> > >
> >
> > *Yes, the QueryableState's isolation level is *Read Uncommitted*. I think
> > if we need a higher isolation level, may need other mechanisms to
> guarantee
> > this. I am sorry, I can not give the solution.*
> > *However, I think it would not affect we discuss how to improve the
> > queryable state's architecture, right?*
> >
> >
> > >
> > > 2. Currently in Flink sink is more commonly regarded as the "result
> > > partition" and state of operators in the pipeline more like
> "intermediate
> > > data". Used for debugging purpose is easy to understand but not for
> > ad-hoc
> > > query. Or in another word, what makes user prefer querying the state
> data
> > > instead of sink? Or why we need to query the intermediate data instead
> of
> > > the result?
> > >
> > >
> > *About the opinion that state of operators in the pipeline more like
> > "intermediate data". Yes, you are right. It's intermediate data, and we
> > need it in some scene.*
> > *The valuable is that it represents "real-time". When querying a state,
> we
> > need its current value, we can not wait for sink. The intermediate data
> is
> > also valuable, for example, we just need a partitioned data stream's
> > real-time measure value.*
> >
> >
> > > Further back to the original topic proposed in this thread about
> > > introducing a QueryableStateProxy, I could see some careful
> consideration
> > > on query load on the proxy. However, under heavy load the pressure is
> not
> > > only on query serving but also on meta requesting, which is handled by
> JM
> > > for now. So to release JM pressure, we should also extract the meta
> > serving
> > > task out, and my suggestion is to introduce a new component like
> > > *StateMetaServer* and take over both query and meta serving
> > > responsibilities.
> > >
> >
> > *I think the opinion of metadata's pressure and *StateMetaServer* are
> good.
> > We need to care about them when we design.*
> > *I mentioned the meta info(registry) in the two option's simple
> > architecture picture. Although, I just emphasized the query proxy server,
> > because it is the main component.*
> >
> > *Your worry is reasonable. The proxy server's architecture is good for
> > processing this, such as the mechanisms of request flow control, pressure
> > transfer to a single entry point(for opt2 and opt3, we can serve
> meta-query
> > in a single process).*
> >
> > *Anyway, it just opened a discussion to listen to the community's
> opinion.*
> >
> >
> > >
> > > Best Regards,
> > > Yu
> > >
> > >
> > > On Sat, 27 Apr 2019 at 11:58, vino yang <[hidden email]> wrote:
> > >
> > > > Hi Elias,
> > > >
> > > > I agree with your opinion that "*Flink jobs don't sufficiently meet
> > these
> > > > requirements to work as a replacement for a data store.*".
> Actually, I
> > > > think it's obviously not Flink's goal. If we think that the database
> > > > contains the main two parts(inexactitude): data query and data store.
> > > What
> > > > I and Paul mean is the former.
> > > >
> > > > Yes, you have mentioned it's major value: ad hoc and debugging(IMO,
> > > > especially for the former). To give a real-time calculation result is
> > > very
> > > > import for some scene(such as real-time measure for real-time OLAP)
> in
> > a
> > > > long-term (no-window or large window).
> > > >
> > > > So, my opinion: Queryable state is not dedicated to replacing data
> > > stores.
> > > > However, if we could query state more conveniently, it makes the
> > > streaming
> > > > works more like DB in query aspect.
> > > >
> > > > Best,
> > > > Vino.
> > > >
> > > > Elias Levy <[hidden email]> 于2019年4月27日周六 上午1:30写道:
> > > >
> > > > > On Fri, Apr 26, 2019 at 1:41 AM vino yang <[hidden email]>
> > > wrote:
> > > > >
> > > > > > You are right, currently, the queryable state has few users. And
> I
> > > > > totally
> > > > > > agree with you, it makes the streaming works more like a DB.
> > > > > >
> > > > >
> > > > > Alas, I don't think queryable state will really be used much in
> > > > production
> > > > > other than for ad hoc queries or debugging.  Real data stores at
> > scale
> > > > are
> > > > > resilient, replicated, and with very low downtime.  In my opinion,
> > > Flink
> > > > > jobs don't sufficiently meet these requirements to work as a
> > > replacement
> > > > > for a data store.  Jobs too frequently fail and restart because of
> > > > > checkpoint failures, particularly ones with large state.  And when
> a
> > > job
> > > > > does restart, all too often local restore can't be used (e.g. if
> you
> > > > loose
> > > > > a node).  And since there is no fine grained job recovery and there
> > is
> > > no
> > > > > hot replicas of the data, all the state will need to be restored
> from
> > > the
> > > > > DFS, which for something with large state can take a while.  It's a
> > > nice
> > > > > idea, just not realistic in practice.
> > > > >
> > > >
> > >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

Elias Levy
In reply to this post by vino yang
On Fri, Apr 26, 2019 at 8:58 PM vino yang <[hidden email]> wrote:

> I agree with your opinion that "*Flink jobs don't sufficiently meet these
> requirements to work as a replacement for a data store.*".  Actually, I
> think it's obviously not Flink's goal.
>

I would not be so sure.  When data Artisans introduced
<https://www.ververica.com/blog/queryable-state-use-case-demo> Queryable
State in Flink, one of the use cases was explicitly removing the need for
external key-value stores. This mirrored Confluent's earlier
<https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/>
introduction
of Interactive Queries in Kafka Streams, and they certainly saw querying of
streaming state as a possible alternative to traditional data stores.
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

vino yang
Hi Elias,

OK, I think we do not need to agree on this point of view. In order to make
the discussion more efficient, we need to focus a bit, let's talk about the
query architecture's improvement.

Best,
Vino

Elias Levy <[hidden email]> 于2019年4月30日周二 上午1:06写道:

> On Fri, Apr 26, 2019 at 8:58 PM vino yang <[hidden email]> wrote:
>
> > I agree with your opinion that "*Flink jobs don't sufficiently meet these
> > requirements to work as a replacement for a data store.*".  Actually, I
> > think it's obviously not Flink's goal.
> >
>
> I would not be so sure.  When data Artisans introduced
> <https://www.ververica.com/blog/queryable-state-use-case-demo> Queryable
> State in Flink, one of the use cases was explicitly removing the need for
> external key-value stores. This mirrored Confluent's earlier
> <
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
> >
> introduction
> of Interactive Queries in Kafka Streams, and they certainly saw querying of
> streaming state as a possible alternative to traditional data stores.
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

Aljoscha Krettek-2
Hi Everyone,

I think this is a good discussion and valuable ideas have come up. However, it seems none of the committers and/or PMCs currently have time to work on this subject. Till, who’s focusing on the distributed runtime side, which is touched quite a bit by queryable state, is currently focusing on refactorings that we need for better batch scheduling and resource management, among other things. I and other committers that work more on the API side are focusing on reworking the Table API to separate the concerns and facilitate the incorporation of the new Table API runner that is being developed by contributors at Alibaba.

I’m not saying that you shouldn’t discuss this topic or maybe even develop a proof of concept. It will probably not be added to Flink in the foreseeable future because of lack of committer bandwidth, though. I hope this is not too discouraging.

Aljoscha

> On 30. Apr 2019, at 04:09, vino yang <[hidden email]> wrote:
>
> Hi Elias,
>
> OK, I think we do not need to agree on this point of view. In order to make
> the discussion more efficient, we need to focus a bit, let's talk about the
> query architecture's improvement.
>
> Best,
> Vino
>
> Elias Levy <[hidden email]> 于2019年4月30日周二 上午1:06写道:
>
>> On Fri, Apr 26, 2019 at 8:58 PM vino yang <[hidden email]> wrote:
>>
>>> I agree with your opinion that "*Flink jobs don't sufficiently meet these
>>> requirements to work as a replacement for a data store.*".  Actually, I
>>> think it's obviously not Flink's goal.
>>>
>>
>> I would not be so sure.  When data Artisans introduced
>> <https://www.ververica.com/blog/queryable-state-use-case-demo> Queryable
>> State in Flink, one of the use cases was explicitly removing the need for
>> external key-value stores. This mirrored Confluent's earlier
>> <
>> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
>>>
>> introduction
>> of Interactive Queries in Kafka Streams, and they certainly saw querying of
>> streaming state as a possible alternative to traditional data stores.
>>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

vino yang
Hi Aljoscha,

Thanks for agreeing this is a valuable idea. I know the committers and PMCs
are busy before Flink 1.9 even 2.0. We think it's a good improvement for
queryable state and even some interactive query scenarios.
I don't mind if the timeline will be pulled long.

Although, it could not be added to Flink now. However, IMO, we can do some
work to let it towards the goal. For example,


   - Write a design draft (I want to try this);
   - Discuss and review the design detail;
   - When we agree with the design, we can split it into several subtasks
   (I think there are other contributors want to implement together)

I will try to invite some committers to review the design and give
suggestions. What do you think?

Best,
Vino

Aljoscha Krettek <[hidden email]> 于2019年5月15日周三 下午4:27写道:

> Hi Everyone,
>
> I think this is a good discussion and valuable ideas have come up.
> However, it seems none of the committers and/or PMCs currently have time to
> work on this subject. Till, who’s focusing on the distributed runtime side,
> which is touched quite a bit by queryable state, is currently focusing on
> refactorings that we need for better batch scheduling and resource
> management, among other things. I and other committers that work more on
> the API side are focusing on reworking the Table API to separate the
> concerns and facilitate the incorporation of the new Table API runner that
> is being developed by contributors at Alibaba.
>
> I’m not saying that you shouldn’t discuss this topic or maybe even develop
> a proof of concept. It will probably not be added to Flink in the
> foreseeable future because of lack of committer bandwidth, though. I hope
> this is not too discouraging.
>
> Aljoscha
>
> > On 30. Apr 2019, at 04:09, vino yang <[hidden email]> wrote:
> >
> > Hi Elias,
> >
> > OK, I think we do not need to agree on this point of view. In order to
> make
> > the discussion more efficient, we need to focus a bit, let's talk about
> the
> > query architecture's improvement.
> >
> > Best,
> > Vino
> >
> > Elias Levy <[hidden email]> 于2019年4月30日周二 上午1:06写道:
> >
> >> On Fri, Apr 26, 2019 at 8:58 PM vino yang <[hidden email]>
> wrote:
> >>
> >>> I agree with your opinion that "*Flink jobs don't sufficiently meet
> these
> >>> requirements to work as a replacement for a data store.*".  Actually, I
> >>> think it's obviously not Flink's goal.
> >>>
> >>
> >> I would not be so sure.  When data Artisans introduced
> >> <https://www.ververica.com/blog/queryable-state-use-case-demo>
> Queryable
> >> State in Flink, one of the use cases was explicitly removing the need
> for
> >> external key-value stores. This mirrored Confluent's earlier
> >> <
> >>
> https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/
> >>>
> >> introduction
> >> of Interactive Queries in Kafka Streams, and they certainly saw
> querying of
> >> streaming state as a possible alternative to traditional data stores.
> >>
>
>
Reply | Threaded
Open this post in threaded view
|

RE: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

Georgi Stoyanov
In reply to this post by vino yang

Hi Vino,

 

I was investigating the current architecture and AFAIK the first proposal will be a lot easier to implement, cause currently JM has the information about the states (where, which etc thanks to KvStateLocationRegistry. Correct me if I’m wrong)

We are using the feature and it’s indeed not very cool to iterate trough ports, check which TM is the responsible one etc etc.

 

It will be very useful if someone from the committers joins the topic and give us some insights what’s going to happen with that feature.

 

 

Kind Regards,

Georgi

 

 

 

From: vino yang <[hidden email]>
Sent: Thursday, April 25, 2019 5:18 PM
To: dev <[hidden email]>; user <[hidden email]>
Cc: Stefan Richter <[hidden email]>; Aljoscha Krettek <[hidden email]>; [hidden email]
Subject: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

 

Hi all,

 

I want to share my thought with you about improving the queryable state and introducing a QueryServerProxy component.

 

I think the current queryable state's client is hard to use. Because it needs users to know the TaskManager's address and proxy's port. Actually, some business users who do not have good knowledge about the Flink's inner or runtime in production. However, sometimes they need to query the values of states.

 

IMO, the reason caused this problem is because of the queryable state's architecture. Currently, the queryable state clients interact with query state client proxy components which host on each TaskManager. This design is difficult to encapsulate the point of change and exposes too much detail to the user.

 

My personal idea is that we could introduce a really queryable state server, named e.g. QueryStateProxyServer which would delegate all the query state request and query the local registry then redirect the request to the specific QueryStateClientProxy(runs on each TaskManager). The server is the users really want to care about. And it would make the users ignorant to the TaskManagers' address and proxies' port. The current QueryStateClientProxy would become QueryStateProxyClient

 

Generally speaking, the roles of the QueryStateProxyServer list below:

 

  • works as all the query client's proxy to receive all the request and send response;
  • a router to redirect the real query requests to the specific proxy client;
  • maintain route table registry (state <-> TaskManager, TaskManager<->proxy client address)
  • more fine-granted control, such as cache result, ACL, TTL, SLA(rate limit) and so on

About the implementation, there are three opts:

 

opt 1:

 

Let the JobManager acts as the query proxy server.

·  pros: reuse the exists JM, do not need to introduce a new process can reduce the complexity;

·  cons: would make JM heavy burdens, depends on the query frequency, may impact on the stability

 

 

opt 2:

 

Introduce a new component  which runs as a single process and acts as the query proxy server:

 

·  pros: reduce the burdens and make the JM more stability

·  cons: introduced a new component will make the implementation more complexity

 

opt 3 (suggestion comes from Stefan Richter): 

 

Combining the two opts, the query server could run as a single entry point(process) and integrate with JobManager. 

 

If we keep it well encapsulated, the only difference would be how we register new TMs with the query server in the different scenarios, in JM we might have this information already, in standalone e.g. the TMs be started with the query server address to register. This would give the convenience to start QS with the JM and the flexibility for power user to reduce load on their JM.

 

IMO, the queryable state is a very valuable feature. It can let users query some real-time measure results. I hope it will get the attention of the community.

 

It is just a roughly thought. If it is valuable to the community, I will give a design draft.

 

What's your opinion? Any feedback and comment are welcome!

 

Best,

Vino.

 

Reply | Threaded
Open this post in threaded view
|

Re: RE: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

vino yang
Hi Georgi,

Thanks for your feedback. And glad to hear you are using queryable state.

I agree that implementation of option 1 is easier than others. However, when we design the new architecture we need to consider more aspects .e.g. scalability. So it seems option 3 is more suitable. Actually, some committers such as Stefan, Gordon and Aljoscha have given me feedback and direction.

Currently, I am writing the design document. If it is ready to be presented. I will copy to this thread and we can discuss further details.

----
Best,
Vino


On 2019-06-07 19:03 , [hidden email] Wrote:

Hi Vino,

 

I was investigating the current architecture and AFAIK the first proposal will be a lot easier to implement, cause currently JM has the information about the states (where, which etc thanks to KvStateLocationRegistry. Correct me if I’m wrong)

We are using the feature and it’s indeed not very cool to iterate trough ports, check which TM is the responsible one etc etc.

 

It will be very useful if someone from the committers joins the topic and give us some insights what’s going to happen with that feature.

 

 

Kind Regards,

Georgi

 

 

 

From: vino yang <[hidden email]>
Sent: Thursday, April 25, 2019 5:18 PM
To: dev <[hidden email]>; user <[hidden email]>
Cc: Stefan Richter <[hidden email]>; Aljoscha Krettek <[hidden email]>; [hidden email]
Subject: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

 

Hi all,

 

I want to share my thought with you about improving the queryable state and introducing a QueryServerProxy component.

 

I think the current queryable state's client is hard to use. Because it needs users to know the TaskManager's address and proxy's port. Actually, some business users who do not have good knowledge about the Flink's inner or runtime in production. However, sometimes they need to query the values of states.

 

IMO, the reason caused this problem is because of the queryable state's architecture. Currently, the queryable state clients interact with query state client proxy components which host on each TaskManager. This design is difficult to encapsulate the point of change and exposes too much detail to the user.

 

My personal idea is that we could introduce a really queryable state server, named e.g. QueryStateProxyServer which would delegate all the query state request and query the local registry then redirect the request to the specific QueryStateClientProxy(runs on each TaskManager). The server is the users really want to care about. And it would make the users ignorant to the TaskManagers' address and proxies' port. The current QueryStateClientProxy would become QueryStateProxyClient

 

Generally speaking, the roles of the QueryStateProxyServer list below:

 

  • works as all the query client's proxy to receive all the request and send response;
  • a router to redirect the real query requests to the specific proxy client;
  • maintain route table registry (state <-> TaskManager, TaskManager<->proxy client address)
  • more fine-granted control, such as cache result, ACL, TTL, SLA(rate limit) and so on

About the implementation, there are three opts:

 

opt 1:

 

Let the JobManager acts as the query proxy server.

·  pros: reuse the exists JM, do not need to introduce a new process can reduce the complexity;

·  cons: would make JM heavy burdens, depends on the query frequency, may impact on the stability

 

 

opt 2:

 

Introduce a new component  which runs as a single process and acts as the query proxy server:

 

·  pros: reduce the burdens and make the JM more stability

·  cons: introduced a new component will make the implementation more complexity

 

opt 3 (suggestion comes from Stefan Richter): 

 

Combining the two opts, the query server could run as a single entry point(process) and integrate with JobManager. 

 

If we keep it well encapsulated, the only difference would be how we register new TMs with the query server in the different scenarios, in JM we might have this information already, in standalone e.g. the TMs be started with the query server address to register. This would give the convenience to start QS with the JM and the flexibility for power user to reduce load on their JM.

 

IMO, the queryable state is a very valuable feature. It can let users query some real-time measure results. I hope it will get the attention of the community.

 

It is just a roughly thought. If it is valuable to the community, I will give a design draft.

 

What's your opinion? Any feedback and comment are welcome!

 

Best,

Vino.

 

Reply | Threaded
Open this post in threaded view
|

Re: RE: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

vino yang
Hi all,

In the past, I have tried to further refine the design of this topic thread and wrote a design document to give more detailed design images and text description, so that it is more conducive to discussion.[1]

Note: The document is not yet completed, for example, the "Implementation" section is missing. Therefore, it is still in an open discussion state. I will improve the rest while listening to the opinions of the community.

Welcome and appreciate more discussions and feedback.

Best,
Vino



yanghua1127 <[hidden email]> 于2019年6月7日周五 下午11:32写道:
Hi Georgi,

Thanks for your feedback. And glad to hear you are using queryable state.

I agree that implementation of option 1 is easier than others. However, when we design the new architecture we need to consider more aspects .e.g. scalability. So it seems option 3 is more suitable. Actually, some committers such as Stefan, Gordon and Aljoscha have given me feedback and direction.

Currently, I am writing the design document. If it is ready to be presented. I will copy to this thread and we can discuss further details.

----
Best,
Vino


On 2019-06-07 19:03 , [hidden email] Wrote:

Hi Vino,

 

I was investigating the current architecture and AFAIK the first proposal will be a lot easier to implement, cause currently JM has the information about the states (where, which etc thanks to KvStateLocationRegistry. Correct me if I’m wrong)

We are using the feature and it’s indeed not very cool to iterate trough ports, check which TM is the responsible one etc etc.

 

It will be very useful if someone from the committers joins the topic and give us some insights what’s going to happen with that feature.

 

 

Kind Regards,

Georgi

 

 

 

From: vino yang <[hidden email]>
Sent: Thursday, April 25, 2019 5:18 PM
To: dev <[hidden email]>; user <[hidden email]>
Cc: Stefan Richter <[hidden email]>; Aljoscha Krettek <[hidden email]>; [hidden email]
Subject: [DISCUSS] Improve Queryable State and introduce a QueryServerProxy component

 

Hi all,

 

I want to share my thought with you about improving the queryable state and introducing a QueryServerProxy component.

 

I think the current queryable state's client is hard to use. Because it needs users to know the TaskManager's address and proxy's port. Actually, some business users who do not have good knowledge about the Flink's inner or runtime in production. However, sometimes they need to query the values of states.

 

IMO, the reason caused this problem is because of the queryable state's architecture. Currently, the queryable state clients interact with query state client proxy components which host on each TaskManager. This design is difficult to encapsulate the point of change and exposes too much detail to the user.

 

My personal idea is that we could introduce a really queryable state server, named e.g. QueryStateProxyServer which would delegate all the query state request and query the local registry then redirect the request to the specific QueryStateClientProxy(runs on each TaskManager). The server is the users really want to care about. And it would make the users ignorant to the TaskManagers' address and proxies' port. The current QueryStateClientProxy would become QueryStateProxyClient

 

Generally speaking, the roles of the QueryStateProxyServer list below:

 

  • works as all the query client's proxy to receive all the request and send response;
  • a router to redirect the real query requests to the specific proxy client;
  • maintain route table registry (state <-> TaskManager, TaskManager<->proxy client address)
  • more fine-granted control, such as cache result, ACL, TTL, SLA(rate limit) and so on

About the implementation, there are three opts:

 

opt 1:

 

Let the JobManager acts as the query proxy server.

·  pros: reuse the exists JM, do not need to introduce a new process can reduce the complexity;

·  cons: would make JM heavy burdens, depends on the query frequency, may impact on the stability

 

 

opt 2:

 

Introduce a new component  which runs as a single process and acts as the query proxy server:

 

·  pros: reduce the burdens and make the JM more stability

·  cons: introduced a new component will make the implementation more complexity

 

opt 3 (suggestion comes from Stefan Richter): 

 

Combining the two opts, the query server could run as a single entry point(process) and integrate with JobManager. 

 

If we keep it well encapsulated, the only difference would be how we register new TMs with the query server in the different scenarios, in JM we might have this information already, in standalone e.g. the TMs be started with the query server address to register. This would give the convenience to start QS with the JM and the flexibility for power user to reduce load on their JM.

 

IMO, the queryable state is a very valuable feature. It can let users query some real-time measure results. I hope it will get the attention of the community.

 

It is just a roughly thought. If it is valuable to the community, I will give a design draft.

 

What's your opinion? Any feedback and comment are welcome!

 

Best,

Vino.