[DISCUSS] FLIP-84 Feedback Summary

classic Classic list List threaded Threaded
39 messages Options
12
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] FLIP-84 Feedback Summary

godfreyhe
Hi community,
Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The feedbacks
are all about new introduced methods. We had a discussion yesterday, and
most of feedbacks have been agreed upon. Here is the conclusions:

*1. about proposed methods in `TableEnvironment`:*

the original proposed methods:

TableEnvironment.createDmlBatch(): DmlBatch
TableEnvironment.executeStatement(String statement): ResultTable

the new proposed methods:

// we should not use abbreviations in the API, and the term "Batch" is
easily confused with batch/streaming processing
TableEnvironment.createStatementSet(): StatementSet

// every method that takes SQL should have `Sql` in its name
// supports multiline statement ???
TableEnvironment.executeSql(String statement): TableResult

// new methods. supports explaining DQL and DML
TableEnvironment.explainSql(String statement, ExplainDetail... details):
String


*2. about proposed related classes:*

the original proposed classes:

interface DmlBatch {
    void addInsert(String insert);
    void addInsert(String targetPath, Table table);
    ResultTable execute() throws Exception ;
    String explain(boolean extended);
}

public interface ResultTable {
    TableSchema getResultSchema();
    Iterable<Row> getResultRows();
}

the new proposed classes:

interface StatementSet {
    // every method that takes SQL should have `Sql` in its name
    // return StatementSet instance for fluent programming
    addInsertSql(String statement): StatementSet

    // return StatementSet instance for fluent programming
    addInsert(String tablePath, Table table): StatementSet

    // new method. support overwrite mode
    addInsert(String tablePath, Table table, boolean overwrite):
StatementSet

    explain(): String

    // new method. supports adding more details for the result
    explain(ExplainDetail... extraDetails): String

    // throw exception ???
    execute(): TableResult
}

interface TableResult {
    getTableSchema(): TableSchema

    // avoid custom parsing of an "OK" row in programming
    getResultKind(): ResultKind

    // instead of `get` make it explicit that this is might be triggering
an expensive operation
    collect(): Iterable<Row>

    // for fluent programming
    print(): Unit
}

enum ResultKind {
    SUCCESS, // for DDL, DCL and statements with a simple "OK"
    SUCCESS_WITH_CONTENT, // rows with important content are available
(DML, DQL)
}


*3. new proposed methods in `Table`*

`Table.insertInto()` will be deprecated, and the following methods are
introduced:

Table.executeInsert(String tablePath): TableResult
Table.executeInsert(String tablePath, boolean overwrite): TableResult
Table.explain(ExplainDetail... details): String
Table.execute(): TableResult

There are two issues need further discussion, one is whether
`TableEnvironment.executeSql(String statement): TableResult` needs to
support multiline statement (or whether `TableEnvironment` needs to support
multiline statement), and another one is whether `StatementSet.execute()`
needs to throw exception.

please refer to the feedback document [2] for the details.

Any suggestions are warmly welcomed!

[1]
https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
[2]
https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit

Best,
Godfrey
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

Timo Walther-2
Hi Godfrey,

thanks for starting the discussion on the mailing list. And sorry again
for the late reply to FLIP-84. I have updated the Google doc one more
time to incorporate the offline discussions.

 From Dawid's and my view, it is fine to postpone the multiline support
to a separate method. This can be future work even though we will need
it rather soon.

If there are no objections, I suggest to update the FLIP-84 again and
have another voting process.

Thanks,
Timo


On 25.03.20 11:17, godfrey he wrote:

> Hi community,
> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The feedbacks
> are all about new introduced methods. We had a discussion yesterday, and
> most of feedbacks have been agreed upon. Here is the conclusions:
>
> *1. about proposed methods in `TableEnvironment`:*
>
> the original proposed methods:
>
> TableEnvironment.createDmlBatch(): DmlBatch
> TableEnvironment.executeStatement(String statement): ResultTable
>
> the new proposed methods:
>
> // we should not use abbreviations in the API, and the term "Batch" is
> easily confused with batch/streaming processing
> TableEnvironment.createStatementSet(): StatementSet
>
> // every method that takes SQL should have `Sql` in its name
> // supports multiline statement ???
> TableEnvironment.executeSql(String statement): TableResult
>
> // new methods. supports explaining DQL and DML
> TableEnvironment.explainSql(String statement, ExplainDetail... details):
> String
>
>
> *2. about proposed related classes:*
>
> the original proposed classes:
>
> interface DmlBatch {
>      void addInsert(String insert);
>      void addInsert(String targetPath, Table table);
>      ResultTable execute() throws Exception ;
>      String explain(boolean extended);
> }
>
> public interface ResultTable {
>      TableSchema getResultSchema();
>      Iterable<Row> getResultRows();
> }
>
> the new proposed classes:
>
> interface StatementSet {
>      // every method that takes SQL should have `Sql` in its name
>      // return StatementSet instance for fluent programming
>      addInsertSql(String statement): StatementSet
>
>      // return StatementSet instance for fluent programming
>      addInsert(String tablePath, Table table): StatementSet
>
>      // new method. support overwrite mode
>      addInsert(String tablePath, Table table, boolean overwrite):
> StatementSet
>
>      explain(): String
>
>      // new method. supports adding more details for the result
>      explain(ExplainDetail... extraDetails): String
>
>      // throw exception ???
>      execute(): TableResult
> }
>
> interface TableResult {
>      getTableSchema(): TableSchema
>
>      // avoid custom parsing of an "OK" row in programming
>      getResultKind(): ResultKind
>
>      // instead of `get` make it explicit that this is might be triggering
> an expensive operation
>      collect(): Iterable<Row>
>
>      // for fluent programming
>      print(): Unit
> }
>
> enum ResultKind {
>      SUCCESS, // for DDL, DCL and statements with a simple "OK"
>      SUCCESS_WITH_CONTENT, // rows with important content are available
> (DML, DQL)
> }
>
>
> *3. new proposed methods in `Table`*
>
> `Table.insertInto()` will be deprecated, and the following methods are
> introduced:
>
> Table.executeInsert(String tablePath): TableResult
> Table.executeInsert(String tablePath, boolean overwrite): TableResult
> Table.explain(ExplainDetail... details): String
> Table.execute(): TableResult
>
> There are two issues need further discussion, one is whether
> `TableEnvironment.executeSql(String statement): TableResult` needs to
> support multiline statement (or whether `TableEnvironment` needs to support
> multiline statement), and another one is whether `StatementSet.execute()`
> needs to throw exception.
>
> please refer to the feedback document [2] for the details.
>
> Any suggestions are warmly welcomed!
>
> [1]
> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> [2]
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
>
> Best,
> Godfrey
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

Jark Wu-2
Hi Godfrey,

The changes sounds good to me. +1 to start another voting.

A minor question: does the ResultKind contain an ERROR kind?

Best,
Jark


On Wed, 25 Mar 2020 at 18:51, Timo Walther <[hidden email]> wrote:

> Hi Godfrey,
>
> thanks for starting the discussion on the mailing list. And sorry again
> for the late reply to FLIP-84. I have updated the Google doc one more
> time to incorporate the offline discussions.
>
>  From Dawid's and my view, it is fine to postpone the multiline support
> to a separate method. This can be future work even though we will need
> it rather soon.
>
> If there are no objections, I suggest to update the FLIP-84 again and
> have another voting process.
>
> Thanks,
> Timo
>
>
> On 25.03.20 11:17, godfrey he wrote:
> > Hi community,
> > Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
> feedbacks
> > are all about new introduced methods. We had a discussion yesterday, and
> > most of feedbacks have been agreed upon. Here is the conclusions:
> >
> > *1. about proposed methods in `TableEnvironment`:*
> >
> > the original proposed methods:
> >
> > TableEnvironment.createDmlBatch(): DmlBatch
> > TableEnvironment.executeStatement(String statement): ResultTable
> >
> > the new proposed methods:
> >
> > // we should not use abbreviations in the API, and the term "Batch" is
> > easily confused with batch/streaming processing
> > TableEnvironment.createStatementSet(): StatementSet
> >
> > // every method that takes SQL should have `Sql` in its name
> > // supports multiline statement ???
> > TableEnvironment.executeSql(String statement): TableResult
> >
> > // new methods. supports explaining DQL and DML
> > TableEnvironment.explainSql(String statement, ExplainDetail... details):
> > String
> >
> >
> > *2. about proposed related classes:*
> >
> > the original proposed classes:
> >
> > interface DmlBatch {
> >      void addInsert(String insert);
> >      void addInsert(String targetPath, Table table);
> >      ResultTable execute() throws Exception ;
> >      String explain(boolean extended);
> > }
> >
> > public interface ResultTable {
> >      TableSchema getResultSchema();
> >      Iterable<Row> getResultRows();
> > }
> >
> > the new proposed classes:
> >
> > interface StatementSet {
> >      // every method that takes SQL should have `Sql` in its name
> >      // return StatementSet instance for fluent programming
> >      addInsertSql(String statement): StatementSet
> >
> >      // return StatementSet instance for fluent programming
> >      addInsert(String tablePath, Table table): StatementSet
> >
> >      // new method. support overwrite mode
> >      addInsert(String tablePath, Table table, boolean overwrite):
> > StatementSet
> >
> >      explain(): String
> >
> >      // new method. supports adding more details for the result
> >      explain(ExplainDetail... extraDetails): String
> >
> >      // throw exception ???
> >      execute(): TableResult
> > }
> >
> > interface TableResult {
> >      getTableSchema(): TableSchema
> >
> >      // avoid custom parsing of an "OK" row in programming
> >      getResultKind(): ResultKind
> >
> >      // instead of `get` make it explicit that this is might be
> triggering
> > an expensive operation
> >      collect(): Iterable<Row>
> >
> >      // for fluent programming
> >      print(): Unit
> > }
> >
> > enum ResultKind {
> >      SUCCESS, // for DDL, DCL and statements with a simple "OK"
> >      SUCCESS_WITH_CONTENT, // rows with important content are available
> > (DML, DQL)
> > }
> >
> >
> > *3. new proposed methods in `Table`*
> >
> > `Table.insertInto()` will be deprecated, and the following methods are
> > introduced:
> >
> > Table.executeInsert(String tablePath): TableResult
> > Table.executeInsert(String tablePath, boolean overwrite): TableResult
> > Table.explain(ExplainDetail... details): String
> > Table.execute(): TableResult
> >
> > There are two issues need further discussion, one is whether
> > `TableEnvironment.executeSql(String statement): TableResult` needs to
> > support multiline statement (or whether `TableEnvironment` needs to
> support
> > multiline statement), and another one is whether `StatementSet.execute()`
> > needs to throw exception.
> >
> > please refer to the feedback document [2] for the details.
> >
> > Any suggestions are warmly welcomed!
> >
> > [1]
> >
> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> > [2]
> >
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
> >
> > Best,
> > Godfrey
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

Timo Walther-2
Hi Jark,

good question. Actually, there was an ERROR kind that could have been
enabled via a config option. Such that everything ends up in the
TableResult. But @Kurt had some concerns which is why we didn't add this
kind of result yet.

Regards,
Timo


On 25.03.20 12:00, Jark Wu wrote:

> Hi Godfrey,
>
> The changes sounds good to me. +1 to start another voting.
>
> A minor question: does the ResultKind contain an ERROR kind?
>
> Best,
> Jark
>
>
> On Wed, 25 Mar 2020 at 18:51, Timo Walther <[hidden email]> wrote:
>
>> Hi Godfrey,
>>
>> thanks for starting the discussion on the mailing list. And sorry again
>> for the late reply to FLIP-84. I have updated the Google doc one more
>> time to incorporate the offline discussions.
>>
>>   From Dawid's and my view, it is fine to postpone the multiline support
>> to a separate method. This can be future work even though we will need
>> it rather soon.
>>
>> If there are no objections, I suggest to update the FLIP-84 again and
>> have another voting process.
>>
>> Thanks,
>> Timo
>>
>>
>> On 25.03.20 11:17, godfrey he wrote:
>>> Hi community,
>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
>> feedbacks
>>> are all about new introduced methods. We had a discussion yesterday, and
>>> most of feedbacks have been agreed upon. Here is the conclusions:
>>>
>>> *1. about proposed methods in `TableEnvironment`:*
>>>
>>> the original proposed methods:
>>>
>>> TableEnvironment.createDmlBatch(): DmlBatch
>>> TableEnvironment.executeStatement(String statement): ResultTable
>>>
>>> the new proposed methods:
>>>
>>> // we should not use abbreviations in the API, and the term "Batch" is
>>> easily confused with batch/streaming processing
>>> TableEnvironment.createStatementSet(): StatementSet
>>>
>>> // every method that takes SQL should have `Sql` in its name
>>> // supports multiline statement ???
>>> TableEnvironment.executeSql(String statement): TableResult
>>>
>>> // new methods. supports explaining DQL and DML
>>> TableEnvironment.explainSql(String statement, ExplainDetail... details):
>>> String
>>>
>>>
>>> *2. about proposed related classes:*
>>>
>>> the original proposed classes:
>>>
>>> interface DmlBatch {
>>>       void addInsert(String insert);
>>>       void addInsert(String targetPath, Table table);
>>>       ResultTable execute() throws Exception ;
>>>       String explain(boolean extended);
>>> }
>>>
>>> public interface ResultTable {
>>>       TableSchema getResultSchema();
>>>       Iterable<Row> getResultRows();
>>> }
>>>
>>> the new proposed classes:
>>>
>>> interface StatementSet {
>>>       // every method that takes SQL should have `Sql` in its name
>>>       // return StatementSet instance for fluent programming
>>>       addInsertSql(String statement): StatementSet
>>>
>>>       // return StatementSet instance for fluent programming
>>>       addInsert(String tablePath, Table table): StatementSet
>>>
>>>       // new method. support overwrite mode
>>>       addInsert(String tablePath, Table table, boolean overwrite):
>>> StatementSet
>>>
>>>       explain(): String
>>>
>>>       // new method. supports adding more details for the result
>>>       explain(ExplainDetail... extraDetails): String
>>>
>>>       // throw exception ???
>>>       execute(): TableResult
>>> }
>>>
>>> interface TableResult {
>>>       getTableSchema(): TableSchema
>>>
>>>       // avoid custom parsing of an "OK" row in programming
>>>       getResultKind(): ResultKind
>>>
>>>       // instead of `get` make it explicit that this is might be
>> triggering
>>> an expensive operation
>>>       collect(): Iterable<Row>
>>>
>>>       // for fluent programming
>>>       print(): Unit
>>> }
>>>
>>> enum ResultKind {
>>>       SUCCESS, // for DDL, DCL and statements with a simple "OK"
>>>       SUCCESS_WITH_CONTENT, // rows with important content are available
>>> (DML, DQL)
>>> }
>>>
>>>
>>> *3. new proposed methods in `Table`*
>>>
>>> `Table.insertInto()` will be deprecated, and the following methods are
>>> introduced:
>>>
>>> Table.executeInsert(String tablePath): TableResult
>>> Table.executeInsert(String tablePath, boolean overwrite): TableResult
>>> Table.explain(ExplainDetail... details): String
>>> Table.execute(): TableResult
>>>
>>> There are two issues need further discussion, one is whether
>>> `TableEnvironment.executeSql(String statement): TableResult` needs to
>>> support multiline statement (or whether `TableEnvironment` needs to
>> support
>>> multiline statement), and another one is whether `StatementSet.execute()`
>>> needs to throw exception.
>>>
>>> please refer to the feedback document [2] for the details.
>>>
>>> Any suggestions are warmly welcomed!
>>>
>>> [1]
>>>
>> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>>> [2]
>>>
>> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
>>>
>>> Best,
>>> Godfrey
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

godfreyhe
In reply to this post by Timo Walther-2
Hi Timo,

Thanks for the updating.

Regarding to "multiline statement support", I'm also fine that
`TableEnvironment.executeSql()` only supports single line statement, and we
can support multiline statement later (needs more discussion about this).

Regarding to "StatementSet.explian()", I don't have strong opinions about
that.

Regarding to "TableResult.getJobClient()", I think it's unnecessary. The
reason is: first, many statements (e.g. DDL, show xx, use xx)  will not
submit a Flink job. second, `TableEnvironment.executeSql()` and
`StatementSet.execute()` are synchronous method, `TableResult` will be
returned only after the job is finished or failed.

Regarding to "whether StatementSet.execute() needs to throw exception", I
think we should choose a unified way to tell whether the execution is
successful. If `TableResult` contains ERROR kind (non-runtime exception),
users need to not only check the result but also catch the runtime
exception in their code. or `StatementSet.execute()` does not throw any
exception (including runtime exception), all exception messages are in the
result.  I prefer "StatementSet.execute() needs to throw exception". cc @Jark
Wu <[hidden email]>

I will update the agreed parts to the document first.

Best,
Godfrey


Timo Walther <[hidden email]> 于2020年3月25日周三 下午6:51写道:

> Hi Godfrey,
>
> thanks for starting the discussion on the mailing list. And sorry again
> for the late reply to FLIP-84. I have updated the Google doc one more
> time to incorporate the offline discussions.
>
>  From Dawid's and my view, it is fine to postpone the multiline support
> to a separate method. This can be future work even though we will need
> it rather soon.
>
> If there are no objections, I suggest to update the FLIP-84 again and
> have another voting process.
>
> Thanks,
> Timo
>
>
> On 25.03.20 11:17, godfrey he wrote:
> > Hi community,
> > Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
> feedbacks
> > are all about new introduced methods. We had a discussion yesterday, and
> > most of feedbacks have been agreed upon. Here is the conclusions:
> >
> > *1. about proposed methods in `TableEnvironment`:*
> >
> > the original proposed methods:
> >
> > TableEnvironment.createDmlBatch(): DmlBatch
> > TableEnvironment.executeStatement(String statement): ResultTable
> >
> > the new proposed methods:
> >
> > // we should not use abbreviations in the API, and the term "Batch" is
> > easily confused with batch/streaming processing
> > TableEnvironment.createStatementSet(): StatementSet
> >
> > // every method that takes SQL should have `Sql` in its name
> > // supports multiline statement ???
> > TableEnvironment.executeSql(String statement): TableResult
> >
> > // new methods. supports explaining DQL and DML
> > TableEnvironment.explainSql(String statement, ExplainDetail... details):
> > String
> >
> >
> > *2. about proposed related classes:*
> >
> > the original proposed classes:
> >
> > interface DmlBatch {
> >      void addInsert(String insert);
> >      void addInsert(String targetPath, Table table);
> >      ResultTable execute() throws Exception ;
> >      String explain(boolean extended);
> > }
> >
> > public interface ResultTable {
> >      TableSchema getResultSchema();
> >      Iterable<Row> getResultRows();
> > }
> >
> > the new proposed classes:
> >
> > interface StatementSet {
> >      // every method that takes SQL should have `Sql` in its name
> >      // return StatementSet instance for fluent programming
> >      addInsertSql(String statement): StatementSet
> >
> >      // return StatementSet instance for fluent programming
> >      addInsert(String tablePath, Table table): StatementSet
> >
> >      // new method. support overwrite mode
> >      addInsert(String tablePath, Table table, boolean overwrite):
> > StatementSet
> >
> >      explain(): String
> >
> >      // new method. supports adding more details for the result
> >      explain(ExplainDetail... extraDetails): String
> >
> >      // throw exception ???
> >      execute(): TableResult
> > }
> >
> > interface TableResult {
> >      getTableSchema(): TableSchema
> >
> >      // avoid custom parsing of an "OK" row in programming
> >      getResultKind(): ResultKind
> >
> >      // instead of `get` make it explicit that this is might be
> triggering
> > an expensive operation
> >      collect(): Iterable<Row>
> >
> >      // for fluent programming
> >      print(): Unit
> > }
> >
> > enum ResultKind {
> >      SUCCESS, // for DDL, DCL and statements with a simple "OK"
> >      SUCCESS_WITH_CONTENT, // rows with important content are available
> > (DML, DQL)
> > }
> >
> >
> > *3. new proposed methods in `Table`*
> >
> > `Table.insertInto()` will be deprecated, and the following methods are
> > introduced:
> >
> > Table.executeInsert(String tablePath): TableResult
> > Table.executeInsert(String tablePath, boolean overwrite): TableResult
> > Table.explain(ExplainDetail... details): String
> > Table.execute(): TableResult
> >
> > There are two issues need further discussion, one is whether
> > `TableEnvironment.executeSql(String statement): TableResult` needs to
> > support multiline statement (or whether `TableEnvironment` needs to
> support
> > multiline statement), and another one is whether `StatementSet.execute()`
> > needs to throw exception.
> >
> > please refer to the feedback document [2] for the details.
> >
> > Any suggestions are warmly welcomed!
> >
> > [1]
> >
> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> > [2]
> >
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
> >
> > Best,
> > Godfrey
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

Timo Walther-2
Hi Godfrey,

having control over the job after submission is a requirement that was
requested frequently (some examples are [1], [2]). Users would like to
get insights about the running or completed job. Including the jobId,
jobGraph etc., the JobClient summarizes these properties.

It is good to have a discussion about synchronous/asynchronous
submission now to have a complete execution picture.

I thought we submit streaming queries mostly async and just wait for the
successful submission. If we block for streaming queries, how can we
collect() or print() results?

Also, if we block for streaming queries, we could never support
multiline files. Because the first INSERT INTO would block the further
execution.

If we decide to block entirely on streaming queries, we need the async
execution methods in the design already. However, I would rather go for
non-blocking streaming queries. Also with the `EMIT STREAM` key word in
mind that we might add to SQL statements soon.

Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-16761
[2] https://issues.apache.org/jira/browse/FLINK-12214

On 25.03.20 16:30, godfrey he wrote:

> Hi Timo,
>
> Thanks for the updating.
>
> Regarding to "multiline statement support", I'm also fine that
> `TableEnvironment.executeSql()` only supports single line statement, and we
> can support multiline statement later (needs more discussion about this).
>
> Regarding to "StatementSet.explian()", I don't have strong opinions about
> that.
>
> Regarding to "TableResult.getJobClient()", I think it's unnecessary. The
> reason is: first, many statements (e.g. DDL, show xx, use xx)  will not
> submit a Flink job. second, `TableEnvironment.executeSql()` and
> `StatementSet.execute()` are synchronous method, `TableResult` will be
> returned only after the job is finished or failed.
>
> Regarding to "whether StatementSet.execute() needs to throw exception", I
> think we should choose a unified way to tell whether the execution is
> successful. If `TableResult` contains ERROR kind (non-runtime exception),
> users need to not only check the result but also catch the runtime
> exception in their code. or `StatementSet.execute()` does not throw any
> exception (including runtime exception), all exception messages are in the
> result.  I prefer "StatementSet.execute() needs to throw exception". cc @Jark
> Wu <[hidden email]>
>
> I will update the agreed parts to the document first.
>
> Best,
> Godfrey
>
>
> Timo Walther <[hidden email]> 于2020年3月25日周三 下午6:51写道:
>
>> Hi Godfrey,
>>
>> thanks for starting the discussion on the mailing list. And sorry again
>> for the late reply to FLIP-84. I have updated the Google doc one more
>> time to incorporate the offline discussions.
>>
>>   From Dawid's and my view, it is fine to postpone the multiline support
>> to a separate method. This can be future work even though we will need
>> it rather soon.
>>
>> If there are no objections, I suggest to update the FLIP-84 again and
>> have another voting process.
>>
>> Thanks,
>> Timo
>>
>>
>> On 25.03.20 11:17, godfrey he wrote:
>>> Hi community,
>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
>> feedbacks
>>> are all about new introduced methods. We had a discussion yesterday, and
>>> most of feedbacks have been agreed upon. Here is the conclusions:
>>>
>>> *1. about proposed methods in `TableEnvironment`:*
>>>
>>> the original proposed methods:
>>>
>>> TableEnvironment.createDmlBatch(): DmlBatch
>>> TableEnvironment.executeStatement(String statement): ResultTable
>>>
>>> the new proposed methods:
>>>
>>> // we should not use abbreviations in the API, and the term "Batch" is
>>> easily confused with batch/streaming processing
>>> TableEnvironment.createStatementSet(): StatementSet
>>>
>>> // every method that takes SQL should have `Sql` in its name
>>> // supports multiline statement ???
>>> TableEnvironment.executeSql(String statement): TableResult
>>>
>>> // new methods. supports explaining DQL and DML
>>> TableEnvironment.explainSql(String statement, ExplainDetail... details):
>>> String
>>>
>>>
>>> *2. about proposed related classes:*
>>>
>>> the original proposed classes:
>>>
>>> interface DmlBatch {
>>>       void addInsert(String insert);
>>>       void addInsert(String targetPath, Table table);
>>>       ResultTable execute() throws Exception ;
>>>       String explain(boolean extended);
>>> }
>>>
>>> public interface ResultTable {
>>>       TableSchema getResultSchema();
>>>       Iterable<Row> getResultRows();
>>> }
>>>
>>> the new proposed classes:
>>>
>>> interface StatementSet {
>>>       // every method that takes SQL should have `Sql` in its name
>>>       // return StatementSet instance for fluent programming
>>>       addInsertSql(String statement): StatementSet
>>>
>>>       // return StatementSet instance for fluent programming
>>>       addInsert(String tablePath, Table table): StatementSet
>>>
>>>       // new method. support overwrite mode
>>>       addInsert(String tablePath, Table table, boolean overwrite):
>>> StatementSet
>>>
>>>       explain(): String
>>>
>>>       // new method. supports adding more details for the result
>>>       explain(ExplainDetail... extraDetails): String
>>>
>>>       // throw exception ???
>>>       execute(): TableResult
>>> }
>>>
>>> interface TableResult {
>>>       getTableSchema(): TableSchema
>>>
>>>       // avoid custom parsing of an "OK" row in programming
>>>       getResultKind(): ResultKind
>>>
>>>       // instead of `get` make it explicit that this is might be
>> triggering
>>> an expensive operation
>>>       collect(): Iterable<Row>
>>>
>>>       // for fluent programming
>>>       print(): Unit
>>> }
>>>
>>> enum ResultKind {
>>>       SUCCESS, // for DDL, DCL and statements with a simple "OK"
>>>       SUCCESS_WITH_CONTENT, // rows with important content are available
>>> (DML, DQL)
>>> }
>>>
>>>
>>> *3. new proposed methods in `Table`*
>>>
>>> `Table.insertInto()` will be deprecated, and the following methods are
>>> introduced:
>>>
>>> Table.executeInsert(String tablePath): TableResult
>>> Table.executeInsert(String tablePath, boolean overwrite): TableResult
>>> Table.explain(ExplainDetail... details): String
>>> Table.execute(): TableResult
>>>
>>> There are two issues need further discussion, one is whether
>>> `TableEnvironment.executeSql(String statement): TableResult` needs to
>>> support multiline statement (or whether `TableEnvironment` needs to
>> support
>>> multiline statement), and another one is whether `StatementSet.execute()`
>>> needs to throw exception.
>>>
>>> please refer to the feedback document [2] for the details.
>>>
>>> Any suggestions are warmly welcomed!
>>>
>>> [1]
>>>
>> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>>> [2]
>>>
>> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
>>>
>>> Best,
>>> Godfrey
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

godfreyhe
Hi Timo,

I agree with you that streaming queries mostly need async execution.
In fact, our original plan is only introducing sync methods in this FLIP,
and async methods (like "executeSqlAsync") will be introduced in the future
which is mentioned in the appendix.

Maybe the async methods also need to be considered in this FLIP.

I think sync methods is also useful for streaming which can be used to run
bounded source.
Maybe we should check whether all sources are bounded in sync execution
mode.

>Also, if we block for streaming queries, we could never support
> multiline files. Because the first INSERT INTO would block the further
> execution.
agree with you, we need async method to submit multiline files,
and files should be limited that the DQL and DML should be always in the
end for streaming.

Best,
Godfrey

Timo Walther <[hidden email]> 于2020年3月26日周四 下午4:29写道:

> Hi Godfrey,
>
> having control over the job after submission is a requirement that was
> requested frequently (some examples are [1], [2]). Users would like to
> get insights about the running or completed job. Including the jobId,
> jobGraph etc., the JobClient summarizes these properties.
>
> It is good to have a discussion about synchronous/asynchronous
> submission now to have a complete execution picture.
>
> I thought we submit streaming queries mostly async and just wait for the
> successful submission. If we block for streaming queries, how can we
> collect() or print() results?
>
> Also, if we block for streaming queries, we could never support
> multiline files. Because the first INSERT INTO would block the further
> execution.
>
> If we decide to block entirely on streaming queries, we need the async
> execution methods in the design already. However, I would rather go for
> non-blocking streaming queries. Also with the `EMIT STREAM` key word in
> mind that we might add to SQL statements soon.
>
> Regards,
> Timo
>
> [1] https://issues.apache.org/jira/browse/FLINK-16761
> [2] https://issues.apache.org/jira/browse/FLINK-12214
>
> On 25.03.20 16:30, godfrey he wrote:
> > Hi Timo,
> >
> > Thanks for the updating.
> >
> > Regarding to "multiline statement support", I'm also fine that
> > `TableEnvironment.executeSql()` only supports single line statement, and
> we
> > can support multiline statement later (needs more discussion about this).
> >
> > Regarding to "StatementSet.explian()", I don't have strong opinions about
> > that.
> >
> > Regarding to "TableResult.getJobClient()", I think it's unnecessary. The
> > reason is: first, many statements (e.g. DDL, show xx, use xx)  will not
> > submit a Flink job. second, `TableEnvironment.executeSql()` and
> > `StatementSet.execute()` are synchronous method, `TableResult` will be
> > returned only after the job is finished or failed.
> >
> > Regarding to "whether StatementSet.execute() needs to throw exception", I
> > think we should choose a unified way to tell whether the execution is
> > successful. If `TableResult` contains ERROR kind (non-runtime exception),
> > users need to not only check the result but also catch the runtime
> > exception in their code. or `StatementSet.execute()` does not throw any
> > exception (including runtime exception), all exception messages are in
> the
> > result.  I prefer "StatementSet.execute() needs to throw exception". cc
> @Jark
> > Wu <[hidden email]>
> >
> > I will update the agreed parts to the document first.
> >
> > Best,
> > Godfrey
> >
> >
> > Timo Walther <[hidden email]> 于2020年3月25日周三 下午6:51写道:
> >
> >> Hi Godfrey,
> >>
> >> thanks for starting the discussion on the mailing list. And sorry again
> >> for the late reply to FLIP-84. I have updated the Google doc one more
> >> time to incorporate the offline discussions.
> >>
> >>   From Dawid's and my view, it is fine to postpone the multiline support
> >> to a separate method. This can be future work even though we will need
> >> it rather soon.
> >>
> >> If there are no objections, I suggest to update the FLIP-84 again and
> >> have another voting process.
> >>
> >> Thanks,
> >> Timo
> >>
> >>
> >> On 25.03.20 11:17, godfrey he wrote:
> >>> Hi community,
> >>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
> >> feedbacks
> >>> are all about new introduced methods. We had a discussion yesterday,
> and
> >>> most of feedbacks have been agreed upon. Here is the conclusions:
> >>>
> >>> *1. about proposed methods in `TableEnvironment`:*
> >>>
> >>> the original proposed methods:
> >>>
> >>> TableEnvironment.createDmlBatch(): DmlBatch
> >>> TableEnvironment.executeStatement(String statement): ResultTable
> >>>
> >>> the new proposed methods:
> >>>
> >>> // we should not use abbreviations in the API, and the term "Batch" is
> >>> easily confused with batch/streaming processing
> >>> TableEnvironment.createStatementSet(): StatementSet
> >>>
> >>> // every method that takes SQL should have `Sql` in its name
> >>> // supports multiline statement ???
> >>> TableEnvironment.executeSql(String statement): TableResult
> >>>
> >>> // new methods. supports explaining DQL and DML
> >>> TableEnvironment.explainSql(String statement, ExplainDetail...
> details):
> >>> String
> >>>
> >>>
> >>> *2. about proposed related classes:*
> >>>
> >>> the original proposed classes:
> >>>
> >>> interface DmlBatch {
> >>>       void addInsert(String insert);
> >>>       void addInsert(String targetPath, Table table);
> >>>       ResultTable execute() throws Exception ;
> >>>       String explain(boolean extended);
> >>> }
> >>>
> >>> public interface ResultTable {
> >>>       TableSchema getResultSchema();
> >>>       Iterable<Row> getResultRows();
> >>> }
> >>>
> >>> the new proposed classes:
> >>>
> >>> interface StatementSet {
> >>>       // every method that takes SQL should have `Sql` in its name
> >>>       // return StatementSet instance for fluent programming
> >>>       addInsertSql(String statement): StatementSet
> >>>
> >>>       // return StatementSet instance for fluent programming
> >>>       addInsert(String tablePath, Table table): StatementSet
> >>>
> >>>       // new method. support overwrite mode
> >>>       addInsert(String tablePath, Table table, boolean overwrite):
> >>> StatementSet
> >>>
> >>>       explain(): String
> >>>
> >>>       // new method. supports adding more details for the result
> >>>       explain(ExplainDetail... extraDetails): String
> >>>
> >>>       // throw exception ???
> >>>       execute(): TableResult
> >>> }
> >>>
> >>> interface TableResult {
> >>>       getTableSchema(): TableSchema
> >>>
> >>>       // avoid custom parsing of an "OK" row in programming
> >>>       getResultKind(): ResultKind
> >>>
> >>>       // instead of `get` make it explicit that this is might be
> >> triggering
> >>> an expensive operation
> >>>       collect(): Iterable<Row>
> >>>
> >>>       // for fluent programming
> >>>       print(): Unit
> >>> }
> >>>
> >>> enum ResultKind {
> >>>       SUCCESS, // for DDL, DCL and statements with a simple "OK"
> >>>       SUCCESS_WITH_CONTENT, // rows with important content are
> available
> >>> (DML, DQL)
> >>> }
> >>>
> >>>
> >>> *3. new proposed methods in `Table`*
> >>>
> >>> `Table.insertInto()` will be deprecated, and the following methods are
> >>> introduced:
> >>>
> >>> Table.executeInsert(String tablePath): TableResult
> >>> Table.executeInsert(String tablePath, boolean overwrite): TableResult
> >>> Table.explain(ExplainDetail... details): String
> >>> Table.execute(): TableResult
> >>>
> >>> There are two issues need further discussion, one is whether
> >>> `TableEnvironment.executeSql(String statement): TableResult` needs to
> >>> support multiline statement (or whether `TableEnvironment` needs to
> >> support
> >>> multiline statement), and another one is whether
> `StatementSet.execute()`
> >>> needs to throw exception.
> >>>
> >>> please refer to the feedback document [2] for the details.
> >>>
> >>> Any suggestions are warmly welcomed!
> >>>
> >>> [1]
> >>>
> >>
> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >>> [2]
> >>>
> >>
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
> >>>
> >>> Best,
> >>> Godfrey
> >>>
> >>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

Timo Walther-2
Hi Godfrey,

executing streaming queries must be our top priority because this is
what distinguishes Flink from competitors. If we change the execution
behavior, we should think about the other cases as well to not break the
API a third time.

I fear that just having an async execute method will not be enough
because users should be able to mix streaming and batch queries in a
unified scenario.

If I remember it correctly, we had some discussions in the past about
what decides about the execution mode of a query. Currently, we would
like to let the query decide, not derive it from the sources.

So I could image a multiline pipeline as:

USE CATALOG 'mycat';
INSERT INTO t1 SELECT * FROM s;
INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;

For executeMultilineSql():

sync because regular SQL
sync because regular Batch SQL
async because Streaming SQL

For executeAsyncMultilineSql():

async because everything should be async
async because everything should be async
async because everything should be async

What we should not start for executeAsyncMultilineSql():

sync because DDL
async because everything should be async
async because everything should be async

What are you thoughts here?

Regards,
Timo


On 26.03.20 12:50, godfrey he wrote:

> Hi Timo,
>
> I agree with you that streaming queries mostly need async execution.
> In fact, our original plan is only introducing sync methods in this FLIP,
> and async methods (like "executeSqlAsync") will be introduced in the future
> which is mentioned in the appendix.
>
> Maybe the async methods also need to be considered in this FLIP.
>
> I think sync methods is also useful for streaming which can be used to run
> bounded source.
> Maybe we should check whether all sources are bounded in sync execution
> mode.
>
>> Also, if we block for streaming queries, we could never support
>> multiline files. Because the first INSERT INTO would block the further
>> execution.
> agree with you, we need async method to submit multiline files,
> and files should be limited that the DQL and DML should be always in the
> end for streaming.
>
> Best,
> Godfrey
>
> Timo Walther <[hidden email]> 于2020年3月26日周四 下午4:29写道:
>
>> Hi Godfrey,
>>
>> having control over the job after submission is a requirement that was
>> requested frequently (some examples are [1], [2]). Users would like to
>> get insights about the running or completed job. Including the jobId,
>> jobGraph etc., the JobClient summarizes these properties.
>>
>> It is good to have a discussion about synchronous/asynchronous
>> submission now to have a complete execution picture.
>>
>> I thought we submit streaming queries mostly async and just wait for the
>> successful submission. If we block for streaming queries, how can we
>> collect() or print() results?
>>
>> Also, if we block for streaming queries, we could never support
>> multiline files. Because the first INSERT INTO would block the further
>> execution.
>>
>> If we decide to block entirely on streaming queries, we need the async
>> execution methods in the design already. However, I would rather go for
>> non-blocking streaming queries. Also with the `EMIT STREAM` key word in
>> mind that we might add to SQL statements soon.
>>
>> Regards,
>> Timo
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-16761
>> [2] https://issues.apache.org/jira/browse/FLINK-12214
>>
>> On 25.03.20 16:30, godfrey he wrote:
>>> Hi Timo,
>>>
>>> Thanks for the updating.
>>>
>>> Regarding to "multiline statement support", I'm also fine that
>>> `TableEnvironment.executeSql()` only supports single line statement, and
>> we
>>> can support multiline statement later (needs more discussion about this).
>>>
>>> Regarding to "StatementSet.explian()", I don't have strong opinions about
>>> that.
>>>
>>> Regarding to "TableResult.getJobClient()", I think it's unnecessary. The
>>> reason is: first, many statements (e.g. DDL, show xx, use xx)  will not
>>> submit a Flink job. second, `TableEnvironment.executeSql()` and
>>> `StatementSet.execute()` are synchronous method, `TableResult` will be
>>> returned only after the job is finished or failed.
>>>
>>> Regarding to "whether StatementSet.execute() needs to throw exception", I
>>> think we should choose a unified way to tell whether the execution is
>>> successful. If `TableResult` contains ERROR kind (non-runtime exception),
>>> users need to not only check the result but also catch the runtime
>>> exception in their code. or `StatementSet.execute()` does not throw any
>>> exception (including runtime exception), all exception messages are in
>> the
>>> result.  I prefer "StatementSet.execute() needs to throw exception". cc
>> @Jark
>>> Wu <[hidden email]>
>>>
>>> I will update the agreed parts to the document first.
>>>
>>> Best,
>>> Godfrey
>>>
>>>
>>> Timo Walther <[hidden email]> 于2020年3月25日周三 下午6:51写道:
>>>
>>>> Hi Godfrey,
>>>>
>>>> thanks for starting the discussion on the mailing list. And sorry again
>>>> for the late reply to FLIP-84. I have updated the Google doc one more
>>>> time to incorporate the offline discussions.
>>>>
>>>>    From Dawid's and my view, it is fine to postpone the multiline support
>>>> to a separate method. This can be future work even though we will need
>>>> it rather soon.
>>>>
>>>> If there are no objections, I suggest to update the FLIP-84 again and
>>>> have another voting process.
>>>>
>>>> Thanks,
>>>> Timo
>>>>
>>>>
>>>> On 25.03.20 11:17, godfrey he wrote:
>>>>> Hi community,
>>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
>>>> feedbacks
>>>>> are all about new introduced methods. We had a discussion yesterday,
>> and
>>>>> most of feedbacks have been agreed upon. Here is the conclusions:
>>>>>
>>>>> *1. about proposed methods in `TableEnvironment`:*
>>>>>
>>>>> the original proposed methods:
>>>>>
>>>>> TableEnvironment.createDmlBatch(): DmlBatch
>>>>> TableEnvironment.executeStatement(String statement): ResultTable
>>>>>
>>>>> the new proposed methods:
>>>>>
>>>>> // we should not use abbreviations in the API, and the term "Batch" is
>>>>> easily confused with batch/streaming processing
>>>>> TableEnvironment.createStatementSet(): StatementSet
>>>>>
>>>>> // every method that takes SQL should have `Sql` in its name
>>>>> // supports multiline statement ???
>>>>> TableEnvironment.executeSql(String statement): TableResult
>>>>>
>>>>> // new methods. supports explaining DQL and DML
>>>>> TableEnvironment.explainSql(String statement, ExplainDetail...
>> details):
>>>>> String
>>>>>
>>>>>
>>>>> *2. about proposed related classes:*
>>>>>
>>>>> the original proposed classes:
>>>>>
>>>>> interface DmlBatch {
>>>>>        void addInsert(String insert);
>>>>>        void addInsert(String targetPath, Table table);
>>>>>        ResultTable execute() throws Exception ;
>>>>>        String explain(boolean extended);
>>>>> }
>>>>>
>>>>> public interface ResultTable {
>>>>>        TableSchema getResultSchema();
>>>>>        Iterable<Row> getResultRows();
>>>>> }
>>>>>
>>>>> the new proposed classes:
>>>>>
>>>>> interface StatementSet {
>>>>>        // every method that takes SQL should have `Sql` in its name
>>>>>        // return StatementSet instance for fluent programming
>>>>>        addInsertSql(String statement): StatementSet
>>>>>
>>>>>        // return StatementSet instance for fluent programming
>>>>>        addInsert(String tablePath, Table table): StatementSet
>>>>>
>>>>>        // new method. support overwrite mode
>>>>>        addInsert(String tablePath, Table table, boolean overwrite):
>>>>> StatementSet
>>>>>
>>>>>        explain(): String
>>>>>
>>>>>        // new method. supports adding more details for the result
>>>>>        explain(ExplainDetail... extraDetails): String
>>>>>
>>>>>        // throw exception ???
>>>>>        execute(): TableResult
>>>>> }
>>>>>
>>>>> interface TableResult {
>>>>>        getTableSchema(): TableSchema
>>>>>
>>>>>        // avoid custom parsing of an "OK" row in programming
>>>>>        getResultKind(): ResultKind
>>>>>
>>>>>        // instead of `get` make it explicit that this is might be
>>>> triggering
>>>>> an expensive operation
>>>>>        collect(): Iterable<Row>
>>>>>
>>>>>        // for fluent programming
>>>>>        print(): Unit
>>>>> }
>>>>>
>>>>> enum ResultKind {
>>>>>        SUCCESS, // for DDL, DCL and statements with a simple "OK"
>>>>>        SUCCESS_WITH_CONTENT, // rows with important content are
>> available
>>>>> (DML, DQL)
>>>>> }
>>>>>
>>>>>
>>>>> *3. new proposed methods in `Table`*
>>>>>
>>>>> `Table.insertInto()` will be deprecated, and the following methods are
>>>>> introduced:
>>>>>
>>>>> Table.executeInsert(String tablePath): TableResult
>>>>> Table.executeInsert(String tablePath, boolean overwrite): TableResult
>>>>> Table.explain(ExplainDetail... details): String
>>>>> Table.execute(): TableResult
>>>>>
>>>>> There are two issues need further discussion, one is whether
>>>>> `TableEnvironment.executeSql(String statement): TableResult` needs to
>>>>> support multiline statement (or whether `TableEnvironment` needs to
>>>> support
>>>>> multiline statement), and another one is whether
>> `StatementSet.execute()`
>>>>> needs to throw exception.
>>>>>
>>>>> please refer to the feedback document [2] for the details.
>>>>>
>>>>> Any suggestions are warmly welcomed!
>>>>>
>>>>> [1]
>>>>>
>>>>
>> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>>>>> [2]
>>>>>
>>>>
>> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
>>>>>
>>>>> Best,
>>>>> Godfrey
>>>>>
>>>>
>>>>
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

godfreyhe
Hi Timo,

Agree with you that streaming queries is our top priority,
but I think there are too many things need to discuss for multiline
statements:
e.g.
1. what's the behaivor of DDL and DML mixing for async execution:
create table t1 xxx;
create table t2 xxx;
insert into t2 select * from t1 where xxx;
drop table t1; // t1 may be a MySQL table, the data will also be deleted.

t1 is dropped when "insert" job is running.

2. what's the behaivor of unified scenario for async execution: (as you
mentioned)
INSERT INTO t1 SELECT * FROM s;
INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;

The result of the second statement is indeterministic, because the first
statement maybe is running.
I think we need to put a lot of effort to define the behavior of logically
related queries.

In this FLIP, I suggest we only handle single statement, and we also
introduce an async execute method
which is more important and more often used for users.

Dor the sync methods (like `TableEnvironment.executeSql` and
`StatementSet.execute`),
the result will be returned until the job is finished. The following
methods will be introduced in this FLIP:

 /**
  * Asynchronously execute the given single statement
  */
TableEnvironment.executeSqlAsync(String statement): TableResult

/**
 * Asynchronously execute the dml statements as a batch
 */
StatementSet.executeAsync(): TableResult

public interface TableResult {
   /**
    * return JobClient for DQL and DML in async mode, else return
Optional.empty
    */
   Optional<JobClient> getJobClient();
}

what do you think?

Best,
Godfrey

Timo Walther <[hidden email]> 于2020年3月26日周四 下午9:15写道:

> Hi Godfrey,
>
> executing streaming queries must be our top priority because this is
> what distinguishes Flink from competitors. If we change the execution
> behavior, we should think about the other cases as well to not break the
> API a third time.
>
> I fear that just having an async execute method will not be enough
> because users should be able to mix streaming and batch queries in a
> unified scenario.
>
> If I remember it correctly, we had some discussions in the past about
> what decides about the execution mode of a query. Currently, we would
> like to let the query decide, not derive it from the sources.
>
> So I could image a multiline pipeline as:
>
> USE CATALOG 'mycat';
> INSERT INTO t1 SELECT * FROM s;
> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
>
> For executeMultilineSql():
>
> sync because regular SQL
> sync because regular Batch SQL
> async because Streaming SQL
>
> For executeAsyncMultilineSql():
>
> async because everything should be async
> async because everything should be async
> async because everything should be async
>
> What we should not start for executeAsyncMultilineSql():
>
> sync because DDL
> async because everything should be async
> async because everything should be async
>
> What are you thoughts here?
>
> Regards,
> Timo
>
>
> On 26.03.20 12:50, godfrey he wrote:
> > Hi Timo,
> >
> > I agree with you that streaming queries mostly need async execution.
> > In fact, our original plan is only introducing sync methods in this FLIP,
> > and async methods (like "executeSqlAsync") will be introduced in the
> future
> > which is mentioned in the appendix.
> >
> > Maybe the async methods also need to be considered in this FLIP.
> >
> > I think sync methods is also useful for streaming which can be used to
> run
> > bounded source.
> > Maybe we should check whether all sources are bounded in sync execution
> > mode.
> >
> >> Also, if we block for streaming queries, we could never support
> >> multiline files. Because the first INSERT INTO would block the further
> >> execution.
> > agree with you, we need async method to submit multiline files,
> > and files should be limited that the DQL and DML should be always in the
> > end for streaming.
> >
> > Best,
> > Godfrey
> >
> > Timo Walther <[hidden email]> 于2020年3月26日周四 下午4:29写道:
> >
> >> Hi Godfrey,
> >>
> >> having control over the job after submission is a requirement that was
> >> requested frequently (some examples are [1], [2]). Users would like to
> >> get insights about the running or completed job. Including the jobId,
> >> jobGraph etc., the JobClient summarizes these properties.
> >>
> >> It is good to have a discussion about synchronous/asynchronous
> >> submission now to have a complete execution picture.
> >>
> >> I thought we submit streaming queries mostly async and just wait for the
> >> successful submission. If we block for streaming queries, how can we
> >> collect() or print() results?
> >>
> >> Also, if we block for streaming queries, we could never support
> >> multiline files. Because the first INSERT INTO would block the further
> >> execution.
> >>
> >> If we decide to block entirely on streaming queries, we need the async
> >> execution methods in the design already. However, I would rather go for
> >> non-blocking streaming queries. Also with the `EMIT STREAM` key word in
> >> mind that we might add to SQL statements soon.
> >>
> >> Regards,
> >> Timo
> >>
> >> [1] https://issues.apache.org/jira/browse/FLINK-16761
> >> [2] https://issues.apache.org/jira/browse/FLINK-12214
> >>
> >> On 25.03.20 16:30, godfrey he wrote:
> >>> Hi Timo,
> >>>
> >>> Thanks for the updating.
> >>>
> >>> Regarding to "multiline statement support", I'm also fine that
> >>> `TableEnvironment.executeSql()` only supports single line statement,
> and
> >> we
> >>> can support multiline statement later (needs more discussion about
> this).
> >>>
> >>> Regarding to "StatementSet.explian()", I don't have strong opinions
> about
> >>> that.
> >>>
> >>> Regarding to "TableResult.getJobClient()", I think it's unnecessary.
> The
> >>> reason is: first, many statements (e.g. DDL, show xx, use xx)  will not
> >>> submit a Flink job. second, `TableEnvironment.executeSql()` and
> >>> `StatementSet.execute()` are synchronous method, `TableResult` will be
> >>> returned only after the job is finished or failed.
> >>>
> >>> Regarding to "whether StatementSet.execute() needs to throw
> exception", I
> >>> think we should choose a unified way to tell whether the execution is
> >>> successful. If `TableResult` contains ERROR kind (non-runtime
> exception),
> >>> users need to not only check the result but also catch the runtime
> >>> exception in their code. or `StatementSet.execute()` does not throw any
> >>> exception (including runtime exception), all exception messages are in
> >> the
> >>> result.  I prefer "StatementSet.execute() needs to throw exception". cc
> >> @Jark
> >>> Wu <[hidden email]>
> >>>
> >>> I will update the agreed parts to the document first.
> >>>
> >>> Best,
> >>> Godfrey
> >>>
> >>>
> >>> Timo Walther <[hidden email]> 于2020年3月25日周三 下午6:51写道:
> >>>
> >>>> Hi Godfrey,
> >>>>
> >>>> thanks for starting the discussion on the mailing list. And sorry
> again
> >>>> for the late reply to FLIP-84. I have updated the Google doc one more
> >>>> time to incorporate the offline discussions.
> >>>>
> >>>>    From Dawid's and my view, it is fine to postpone the multiline
> support
> >>>> to a separate method. This can be future work even though we will need
> >>>> it rather soon.
> >>>>
> >>>> If there are no objections, I suggest to update the FLIP-84 again and
> >>>> have another voting process.
> >>>>
> >>>> Thanks,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 25.03.20 11:17, godfrey he wrote:
> >>>>> Hi community,
> >>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
> >>>> feedbacks
> >>>>> are all about new introduced methods. We had a discussion yesterday,
> >> and
> >>>>> most of feedbacks have been agreed upon. Here is the conclusions:
> >>>>>
> >>>>> *1. about proposed methods in `TableEnvironment`:*
> >>>>>
> >>>>> the original proposed methods:
> >>>>>
> >>>>> TableEnvironment.createDmlBatch(): DmlBatch
> >>>>> TableEnvironment.executeStatement(String statement): ResultTable
> >>>>>
> >>>>> the new proposed methods:
> >>>>>
> >>>>> // we should not use abbreviations in the API, and the term "Batch"
> is
> >>>>> easily confused with batch/streaming processing
> >>>>> TableEnvironment.createStatementSet(): StatementSet
> >>>>>
> >>>>> // every method that takes SQL should have `Sql` in its name
> >>>>> // supports multiline statement ???
> >>>>> TableEnvironment.executeSql(String statement): TableResult
> >>>>>
> >>>>> // new methods. supports explaining DQL and DML
> >>>>> TableEnvironment.explainSql(String statement, ExplainDetail...
> >> details):
> >>>>> String
> >>>>>
> >>>>>
> >>>>> *2. about proposed related classes:*
> >>>>>
> >>>>> the original proposed classes:
> >>>>>
> >>>>> interface DmlBatch {
> >>>>>        void addInsert(String insert);
> >>>>>        void addInsert(String targetPath, Table table);
> >>>>>        ResultTable execute() throws Exception ;
> >>>>>        String explain(boolean extended);
> >>>>> }
> >>>>>
> >>>>> public interface ResultTable {
> >>>>>        TableSchema getResultSchema();
> >>>>>        Iterable<Row> getResultRows();
> >>>>> }
> >>>>>
> >>>>> the new proposed classes:
> >>>>>
> >>>>> interface StatementSet {
> >>>>>        // every method that takes SQL should have `Sql` in its name
> >>>>>        // return StatementSet instance for fluent programming
> >>>>>        addInsertSql(String statement): StatementSet
> >>>>>
> >>>>>        // return StatementSet instance for fluent programming
> >>>>>        addInsert(String tablePath, Table table): StatementSet
> >>>>>
> >>>>>        // new method. support overwrite mode
> >>>>>        addInsert(String tablePath, Table table, boolean overwrite):
> >>>>> StatementSet
> >>>>>
> >>>>>        explain(): String
> >>>>>
> >>>>>        // new method. supports adding more details for the result
> >>>>>        explain(ExplainDetail... extraDetails): String
> >>>>>
> >>>>>        // throw exception ???
> >>>>>        execute(): TableResult
> >>>>> }
> >>>>>
> >>>>> interface TableResult {
> >>>>>        getTableSchema(): TableSchema
> >>>>>
> >>>>>        // avoid custom parsing of an "OK" row in programming
> >>>>>        getResultKind(): ResultKind
> >>>>>
> >>>>>        // instead of `get` make it explicit that this is might be
> >>>> triggering
> >>>>> an expensive operation
> >>>>>        collect(): Iterable<Row>
> >>>>>
> >>>>>        // for fluent programming
> >>>>>        print(): Unit
> >>>>> }
> >>>>>
> >>>>> enum ResultKind {
> >>>>>        SUCCESS, // for DDL, DCL and statements with a simple "OK"
> >>>>>        SUCCESS_WITH_CONTENT, // rows with important content are
> >> available
> >>>>> (DML, DQL)
> >>>>> }
> >>>>>
> >>>>>
> >>>>> *3. new proposed methods in `Table`*
> >>>>>
> >>>>> `Table.insertInto()` will be deprecated, and the following methods
> are
> >>>>> introduced:
> >>>>>
> >>>>> Table.executeInsert(String tablePath): TableResult
> >>>>> Table.executeInsert(String tablePath, boolean overwrite): TableResult
> >>>>> Table.explain(ExplainDetail... details): String
> >>>>> Table.execute(): TableResult
> >>>>>
> >>>>> There are two issues need further discussion, one is whether
> >>>>> `TableEnvironment.executeSql(String statement): TableResult` needs to
> >>>>> support multiline statement (or whether `TableEnvironment` needs to
> >>>> support
> >>>>> multiline statement), and another one is whether
> >> `StatementSet.execute()`
> >>>>> needs to throw exception.
> >>>>>
> >>>>> please refer to the feedback document [2] for the details.
> >>>>>
> >>>>> Any suggestions are warmly welcomed!
> >>>>>
> >>>>> [1]
> >>>>>
> >>>>
> >>
> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >>>>> [2]
> >>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
> >>>>>
> >>>>> Best,
> >>>>> Godfrey
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

Timo Walther-2
Hi Godfrey,

maybe I wasn't expressing my biggest concern enough in my last mail.
Even in a singleline and sync execution, I think that streaming queries
should not block the execution. Otherwise it is not possible to call
collect() or print() on them afterwards.

"there are too many things need to discuss for multiline":

True, I don't want to solve all of them right now. But what I know is
that our newly introduced methods should fit into a multiline execution.
There is no big difference of calling `executeSql(A), executeSql(B)` and
processing a multiline file `A;\nB;`.

I think the example that you mentioned can simply be undefined for now.
Currently, no catalog is modifying data but just metadata. This is a
separate discussion.

"result of the second statement is indeterministic":

Sure this is indeterministic. But this is the implementers fault and we
cannot forbid such pipelines.

How about we always execute streaming queries async? It would unblock
executeSql() and multiline statements.

Having a `executeSqlAsync()` is useful for batch. However, I don't want
`sync/async` be the new batch/stream flag. The execution behavior should
come from the query itself.

Regards,
Timo


On 30.03.20 11:12, godfrey he wrote:

> Hi Timo,
>
> Agree with you that streaming queries is our top priority,
> but I think there are too many things need to discuss for multiline
> statements:
> e.g.
> 1. what's the behaivor of DDL and DML mixing for async execution:
> create table t1 xxx;
> create table t2 xxx;
> insert into t2 select * from t1 where xxx;
> drop table t1; // t1 may be a MySQL table, the data will also be deleted.
>
> t1 is dropped when "insert" job is running.
>
> 2. what's the behaivor of unified scenario for async execution: (as you
> mentioned)
> INSERT INTO t1 SELECT * FROM s;
> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
>
> The result of the second statement is indeterministic, because the first
> statement maybe is running.
> I think we need to put a lot of effort to define the behavior of logically
> related queries.
>
> In this FLIP, I suggest we only handle single statement, and we also
> introduce an async execute method
> which is more important and more often used for users.
>
> Dor the sync methods (like `TableEnvironment.executeSql` and
> `StatementSet.execute`),
> the result will be returned until the job is finished. The following
> methods will be introduced in this FLIP:
>
>   /**
>    * Asynchronously execute the given single statement
>    */
> TableEnvironment.executeSqlAsync(String statement): TableResult
>
> /**
>   * Asynchronously execute the dml statements as a batch
>   */
> StatementSet.executeAsync(): TableResult
>
> public interface TableResult {
>     /**
>      * return JobClient for DQL and DML in async mode, else return
> Optional.empty
>      */
>     Optional<JobClient> getJobClient();
> }
>
> what do you think?
>
> Best,
> Godfrey
>
> Timo Walther <[hidden email]> 于2020年3月26日周四 下午9:15写道:
>
>> Hi Godfrey,
>>
>> executing streaming queries must be our top priority because this is
>> what distinguishes Flink from competitors. If we change the execution
>> behavior, we should think about the other cases as well to not break the
>> API a third time.
>>
>> I fear that just having an async execute method will not be enough
>> because users should be able to mix streaming and batch queries in a
>> unified scenario.
>>
>> If I remember it correctly, we had some discussions in the past about
>> what decides about the execution mode of a query. Currently, we would
>> like to let the query decide, not derive it from the sources.
>>
>> So I could image a multiline pipeline as:
>>
>> USE CATALOG 'mycat';
>> INSERT INTO t1 SELECT * FROM s;
>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
>>
>> For executeMultilineSql():
>>
>> sync because regular SQL
>> sync because regular Batch SQL
>> async because Streaming SQL
>>
>> For executeAsyncMultilineSql():
>>
>> async because everything should be async
>> async because everything should be async
>> async because everything should be async
>>
>> What we should not start for executeAsyncMultilineSql():
>>
>> sync because DDL
>> async because everything should be async
>> async because everything should be async
>>
>> What are you thoughts here?
>>
>> Regards,
>> Timo
>>
>>
>> On 26.03.20 12:50, godfrey he wrote:
>>> Hi Timo,
>>>
>>> I agree with you that streaming queries mostly need async execution.
>>> In fact, our original plan is only introducing sync methods in this FLIP,
>>> and async methods (like "executeSqlAsync") will be introduced in the
>> future
>>> which is mentioned in the appendix.
>>>
>>> Maybe the async methods also need to be considered in this FLIP.
>>>
>>> I think sync methods is also useful for streaming which can be used to
>> run
>>> bounded source.
>>> Maybe we should check whether all sources are bounded in sync execution
>>> mode.
>>>
>>>> Also, if we block for streaming queries, we could never support
>>>> multiline files. Because the first INSERT INTO would block the further
>>>> execution.
>>> agree with you, we need async method to submit multiline files,
>>> and files should be limited that the DQL and DML should be always in the
>>> end for streaming.
>>>
>>> Best,
>>> Godfrey
>>>
>>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午4:29写道:
>>>
>>>> Hi Godfrey,
>>>>
>>>> having control over the job after submission is a requirement that was
>>>> requested frequently (some examples are [1], [2]). Users would like to
>>>> get insights about the running or completed job. Including the jobId,
>>>> jobGraph etc., the JobClient summarizes these properties.
>>>>
>>>> It is good to have a discussion about synchronous/asynchronous
>>>> submission now to have a complete execution picture.
>>>>
>>>> I thought we submit streaming queries mostly async and just wait for the
>>>> successful submission. If we block for streaming queries, how can we
>>>> collect() or print() results?
>>>>
>>>> Also, if we block for streaming queries, we could never support
>>>> multiline files. Because the first INSERT INTO would block the further
>>>> execution.
>>>>
>>>> If we decide to block entirely on streaming queries, we need the async
>>>> execution methods in the design already. However, I would rather go for
>>>> non-blocking streaming queries. Also with the `EMIT STREAM` key word in
>>>> mind that we might add to SQL statements soon.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>> [1] https://issues.apache.org/jira/browse/FLINK-16761
>>>> [2] https://issues.apache.org/jira/browse/FLINK-12214
>>>>
>>>> On 25.03.20 16:30, godfrey he wrote:
>>>>> Hi Timo,
>>>>>
>>>>> Thanks for the updating.
>>>>>
>>>>> Regarding to "multiline statement support", I'm also fine that
>>>>> `TableEnvironment.executeSql()` only supports single line statement,
>> and
>>>> we
>>>>> can support multiline statement later (needs more discussion about
>> this).
>>>>>
>>>>> Regarding to "StatementSet.explian()", I don't have strong opinions
>> about
>>>>> that.
>>>>>
>>>>> Regarding to "TableResult.getJobClient()", I think it's unnecessary.
>> The
>>>>> reason is: first, many statements (e.g. DDL, show xx, use xx)  will not
>>>>> submit a Flink job. second, `TableEnvironment.executeSql()` and
>>>>> `StatementSet.execute()` are synchronous method, `TableResult` will be
>>>>> returned only after the job is finished or failed.
>>>>>
>>>>> Regarding to "whether StatementSet.execute() needs to throw
>> exception", I
>>>>> think we should choose a unified way to tell whether the execution is
>>>>> successful. If `TableResult` contains ERROR kind (non-runtime
>> exception),
>>>>> users need to not only check the result but also catch the runtime
>>>>> exception in their code. or `StatementSet.execute()` does not throw any
>>>>> exception (including runtime exception), all exception messages are in
>>>> the
>>>>> result.  I prefer "StatementSet.execute() needs to throw exception". cc
>>>> @Jark
>>>>> Wu <[hidden email]>
>>>>>
>>>>> I will update the agreed parts to the document first.
>>>>>
>>>>> Best,
>>>>> Godfrey
>>>>>
>>>>>
>>>>> Timo Walther <[hidden email]> 于2020年3月25日周三 下午6:51写道:
>>>>>
>>>>>> Hi Godfrey,
>>>>>>
>>>>>> thanks for starting the discussion on the mailing list. And sorry
>> again
>>>>>> for the late reply to FLIP-84. I have updated the Google doc one more
>>>>>> time to incorporate the offline discussions.
>>>>>>
>>>>>>     From Dawid's and my view, it is fine to postpone the multiline
>> support
>>>>>> to a separate method. This can be future work even though we will need
>>>>>> it rather soon.
>>>>>>
>>>>>> If there are no objections, I suggest to update the FLIP-84 again and
>>>>>> have another voting process.
>>>>>>
>>>>>> Thanks,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 25.03.20 11:17, godfrey he wrote:
>>>>>>> Hi community,
>>>>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
>>>>>> feedbacks
>>>>>>> are all about new introduced methods. We had a discussion yesterday,
>>>> and
>>>>>>> most of feedbacks have been agreed upon. Here is the conclusions:
>>>>>>>
>>>>>>> *1. about proposed methods in `TableEnvironment`:*
>>>>>>>
>>>>>>> the original proposed methods:
>>>>>>>
>>>>>>> TableEnvironment.createDmlBatch(): DmlBatch
>>>>>>> TableEnvironment.executeStatement(String statement): ResultTable
>>>>>>>
>>>>>>> the new proposed methods:
>>>>>>>
>>>>>>> // we should not use abbreviations in the API, and the term "Batch"
>> is
>>>>>>> easily confused with batch/streaming processing
>>>>>>> TableEnvironment.createStatementSet(): StatementSet
>>>>>>>
>>>>>>> // every method that takes SQL should have `Sql` in its name
>>>>>>> // supports multiline statement ???
>>>>>>> TableEnvironment.executeSql(String statement): TableResult
>>>>>>>
>>>>>>> // new methods. supports explaining DQL and DML
>>>>>>> TableEnvironment.explainSql(String statement, ExplainDetail...
>>>> details):
>>>>>>> String
>>>>>>>
>>>>>>>
>>>>>>> *2. about proposed related classes:*
>>>>>>>
>>>>>>> the original proposed classes:
>>>>>>>
>>>>>>> interface DmlBatch {
>>>>>>>         void addInsert(String insert);
>>>>>>>         void addInsert(String targetPath, Table table);
>>>>>>>         ResultTable execute() throws Exception ;
>>>>>>>         String explain(boolean extended);
>>>>>>> }
>>>>>>>
>>>>>>> public interface ResultTable {
>>>>>>>         TableSchema getResultSchema();
>>>>>>>         Iterable<Row> getResultRows();
>>>>>>> }
>>>>>>>
>>>>>>> the new proposed classes:
>>>>>>>
>>>>>>> interface StatementSet {
>>>>>>>         // every method that takes SQL should have `Sql` in its name
>>>>>>>         // return StatementSet instance for fluent programming
>>>>>>>         addInsertSql(String statement): StatementSet
>>>>>>>
>>>>>>>         // return StatementSet instance for fluent programming
>>>>>>>         addInsert(String tablePath, Table table): StatementSet
>>>>>>>
>>>>>>>         // new method. support overwrite mode
>>>>>>>         addInsert(String tablePath, Table table, boolean overwrite):
>>>>>>> StatementSet
>>>>>>>
>>>>>>>         explain(): String
>>>>>>>
>>>>>>>         // new method. supports adding more details for the result
>>>>>>>         explain(ExplainDetail... extraDetails): String
>>>>>>>
>>>>>>>         // throw exception ???
>>>>>>>         execute(): TableResult
>>>>>>> }
>>>>>>>
>>>>>>> interface TableResult {
>>>>>>>         getTableSchema(): TableSchema
>>>>>>>
>>>>>>>         // avoid custom parsing of an "OK" row in programming
>>>>>>>         getResultKind(): ResultKind
>>>>>>>
>>>>>>>         // instead of `get` make it explicit that this is might be
>>>>>> triggering
>>>>>>> an expensive operation
>>>>>>>         collect(): Iterable<Row>
>>>>>>>
>>>>>>>         // for fluent programming
>>>>>>>         print(): Unit
>>>>>>> }
>>>>>>>
>>>>>>> enum ResultKind {
>>>>>>>         SUCCESS, // for DDL, DCL and statements with a simple "OK"
>>>>>>>         SUCCESS_WITH_CONTENT, // rows with important content are
>>>> available
>>>>>>> (DML, DQL)
>>>>>>> }
>>>>>>>
>>>>>>>
>>>>>>> *3. new proposed methods in `Table`*
>>>>>>>
>>>>>>> `Table.insertInto()` will be deprecated, and the following methods
>> are
>>>>>>> introduced:
>>>>>>>
>>>>>>> Table.executeInsert(String tablePath): TableResult
>>>>>>> Table.executeInsert(String tablePath, boolean overwrite): TableResult
>>>>>>> Table.explain(ExplainDetail... details): String
>>>>>>> Table.execute(): TableResult
>>>>>>>
>>>>>>> There are two issues need further discussion, one is whether
>>>>>>> `TableEnvironment.executeSql(String statement): TableResult` needs to
>>>>>>> support multiline statement (or whether `TableEnvironment` needs to
>>>>>> support
>>>>>>> multiline statement), and another one is whether
>>>> `StatementSet.execute()`
>>>>>>> needs to throw exception.
>>>>>>>
>>>>>>> please refer to the feedback document [2] for the details.
>>>>>>>
>>>>>>> Any suggestions are warmly welcomed!
>>>>>>>
>>>>>>> [1]
>>>>>>>
>>>>>>
>>>>
>> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>>>>>>> [2]
>>>>>>>
>>>>>>
>>>>
>> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
>>>>>>>
>>>>>>> Best,
>>>>>>> Godfrey
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

Jark Wu-2
Hi,

I didn't follow the full discussion.
But I share the same concern with Timo that streaming queries should always
be async.
Otherwise, I can image it will cause a lot of confusion and problems if
users don't deeply keep the "sync" in mind (e.g. client hangs).
Besides, the streaming mode is still the majority use cases of Flink and
Flink SQL. We should put the usability at a high priority.

Best,
Jark


On Mon, 30 Mar 2020 at 23:27, Timo Walther <[hidden email]> wrote:

> Hi Godfrey,
>
> maybe I wasn't expressing my biggest concern enough in my last mail.
> Even in a singleline and sync execution, I think that streaming queries
> should not block the execution. Otherwise it is not possible to call
> collect() or print() on them afterwards.
>
> "there are too many things need to discuss for multiline":
>
> True, I don't want to solve all of them right now. But what I know is
> that our newly introduced methods should fit into a multiline execution.
> There is no big difference of calling `executeSql(A), executeSql(B)` and
> processing a multiline file `A;\nB;`.
>
> I think the example that you mentioned can simply be undefined for now.
> Currently, no catalog is modifying data but just metadata. This is a
> separate discussion.
>
> "result of the second statement is indeterministic":
>
> Sure this is indeterministic. But this is the implementers fault and we
> cannot forbid such pipelines.
>
> How about we always execute streaming queries async? It would unblock
> executeSql() and multiline statements.
>
> Having a `executeSqlAsync()` is useful for batch. However, I don't want
> `sync/async` be the new batch/stream flag. The execution behavior should
> come from the query itself.
>
> Regards,
> Timo
>
>
> On 30.03.20 11:12, godfrey he wrote:
> > Hi Timo,
> >
> > Agree with you that streaming queries is our top priority,
> > but I think there are too many things need to discuss for multiline
> > statements:
> > e.g.
> > 1. what's the behaivor of DDL and DML mixing for async execution:
> > create table t1 xxx;
> > create table t2 xxx;
> > insert into t2 select * from t1 where xxx;
> > drop table t1; // t1 may be a MySQL table, the data will also be deleted.
> >
> > t1 is dropped when "insert" job is running.
> >
> > 2. what's the behaivor of unified scenario for async execution: (as you
> > mentioned)
> > INSERT INTO t1 SELECT * FROM s;
> > INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> >
> > The result of the second statement is indeterministic, because the first
> > statement maybe is running.
> > I think we need to put a lot of effort to define the behavior of
> logically
> > related queries.
> >
> > In this FLIP, I suggest we only handle single statement, and we also
> > introduce an async execute method
> > which is more important and more often used for users.
> >
> > Dor the sync methods (like `TableEnvironment.executeSql` and
> > `StatementSet.execute`),
> > the result will be returned until the job is finished. The following
> > methods will be introduced in this FLIP:
> >
> >   /**
> >    * Asynchronously execute the given single statement
> >    */
> > TableEnvironment.executeSqlAsync(String statement): TableResult
> >
> > /**
> >   * Asynchronously execute the dml statements as a batch
> >   */
> > StatementSet.executeAsync(): TableResult
> >
> > public interface TableResult {
> >     /**
> >      * return JobClient for DQL and DML in async mode, else return
> > Optional.empty
> >      */
> >     Optional<JobClient> getJobClient();
> > }
> >
> > what do you think?
> >
> > Best,
> > Godfrey
> >
> > Timo Walther <[hidden email]> 于2020年3月26日周四 下午9:15写道:
> >
> >> Hi Godfrey,
> >>
> >> executing streaming queries must be our top priority because this is
> >> what distinguishes Flink from competitors. If we change the execution
> >> behavior, we should think about the other cases as well to not break the
> >> API a third time.
> >>
> >> I fear that just having an async execute method will not be enough
> >> because users should be able to mix streaming and batch queries in a
> >> unified scenario.
> >>
> >> If I remember it correctly, we had some discussions in the past about
> >> what decides about the execution mode of a query. Currently, we would
> >> like to let the query decide, not derive it from the sources.
> >>
> >> So I could image a multiline pipeline as:
> >>
> >> USE CATALOG 'mycat';
> >> INSERT INTO t1 SELECT * FROM s;
> >> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> >>
> >> For executeMultilineSql():
> >>
> >> sync because regular SQL
> >> sync because regular Batch SQL
> >> async because Streaming SQL
> >>
> >> For executeAsyncMultilineSql():
> >>
> >> async because everything should be async
> >> async because everything should be async
> >> async because everything should be async
> >>
> >> What we should not start for executeAsyncMultilineSql():
> >>
> >> sync because DDL
> >> async because everything should be async
> >> async because everything should be async
> >>
> >> What are you thoughts here?
> >>
> >> Regards,
> >> Timo
> >>
> >>
> >> On 26.03.20 12:50, godfrey he wrote:
> >>> Hi Timo,
> >>>
> >>> I agree with you that streaming queries mostly need async execution.
> >>> In fact, our original plan is only introducing sync methods in this
> FLIP,
> >>> and async methods (like "executeSqlAsync") will be introduced in the
> >> future
> >>> which is mentioned in the appendix.
> >>>
> >>> Maybe the async methods also need to be considered in this FLIP.
> >>>
> >>> I think sync methods is also useful for streaming which can be used to
> >> run
> >>> bounded source.
> >>> Maybe we should check whether all sources are bounded in sync execution
> >>> mode.
> >>>
> >>>> Also, if we block for streaming queries, we could never support
> >>>> multiline files. Because the first INSERT INTO would block the further
> >>>> execution.
> >>> agree with you, we need async method to submit multiline files,
> >>> and files should be limited that the DQL and DML should be always in
> the
> >>> end for streaming.
> >>>
> >>> Best,
> >>> Godfrey
> >>>
> >>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午4:29写道:
> >>>
> >>>> Hi Godfrey,
> >>>>
> >>>> having control over the job after submission is a requirement that was
> >>>> requested frequently (some examples are [1], [2]). Users would like to
> >>>> get insights about the running or completed job. Including the jobId,
> >>>> jobGraph etc., the JobClient summarizes these properties.
> >>>>
> >>>> It is good to have a discussion about synchronous/asynchronous
> >>>> submission now to have a complete execution picture.
> >>>>
> >>>> I thought we submit streaming queries mostly async and just wait for
> the
> >>>> successful submission. If we block for streaming queries, how can we
> >>>> collect() or print() results?
> >>>>
> >>>> Also, if we block for streaming queries, we could never support
> >>>> multiline files. Because the first INSERT INTO would block the further
> >>>> execution.
> >>>>
> >>>> If we decide to block entirely on streaming queries, we need the async
> >>>> execution methods in the design already. However, I would rather go
> for
> >>>> non-blocking streaming queries. Also with the `EMIT STREAM` key word
> in
> >>>> mind that we might add to SQL statements soon.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>> [1] https://issues.apache.org/jira/browse/FLINK-16761
> >>>> [2] https://issues.apache.org/jira/browse/FLINK-12214
> >>>>
> >>>> On 25.03.20 16:30, godfrey he wrote:
> >>>>> Hi Timo,
> >>>>>
> >>>>> Thanks for the updating.
> >>>>>
> >>>>> Regarding to "multiline statement support", I'm also fine that
> >>>>> `TableEnvironment.executeSql()` only supports single line statement,
> >> and
> >>>> we
> >>>>> can support multiline statement later (needs more discussion about
> >> this).
> >>>>>
> >>>>> Regarding to "StatementSet.explian()", I don't have strong opinions
> >> about
> >>>>> that.
> >>>>>
> >>>>> Regarding to "TableResult.getJobClient()", I think it's unnecessary.
> >> The
> >>>>> reason is: first, many statements (e.g. DDL, show xx, use xx)  will
> not
> >>>>> submit a Flink job. second, `TableEnvironment.executeSql()` and
> >>>>> `StatementSet.execute()` are synchronous method, `TableResult` will
> be
> >>>>> returned only after the job is finished or failed.
> >>>>>
> >>>>> Regarding to "whether StatementSet.execute() needs to throw
> >> exception", I
> >>>>> think we should choose a unified way to tell whether the execution is
> >>>>> successful. If `TableResult` contains ERROR kind (non-runtime
> >> exception),
> >>>>> users need to not only check the result but also catch the runtime
> >>>>> exception in their code. or `StatementSet.execute()` does not throw
> any
> >>>>> exception (including runtime exception), all exception messages are
> in
> >>>> the
> >>>>> result.  I prefer "StatementSet.execute() needs to throw exception".
> cc
> >>>> @Jark
> >>>>> Wu <[hidden email]>
> >>>>>
> >>>>> I will update the agreed parts to the document first.
> >>>>>
> >>>>> Best,
> >>>>> Godfrey
> >>>>>
> >>>>>
> >>>>> Timo Walther <[hidden email]> 于2020年3月25日周三 下午6:51写道:
> >>>>>
> >>>>>> Hi Godfrey,
> >>>>>>
> >>>>>> thanks for starting the discussion on the mailing list. And sorry
> >> again
> >>>>>> for the late reply to FLIP-84. I have updated the Google doc one
> more
> >>>>>> time to incorporate the offline discussions.
> >>>>>>
> >>>>>>     From Dawid's and my view, it is fine to postpone the multiline
> >> support
> >>>>>> to a separate method. This can be future work even though we will
> need
> >>>>>> it rather soon.
> >>>>>>
> >>>>>> If there are no objections, I suggest to update the FLIP-84 again
> and
> >>>>>> have another voting process.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 25.03.20 11:17, godfrey he wrote:
> >>>>>>> Hi community,
> >>>>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
> >>>>>> feedbacks
> >>>>>>> are all about new introduced methods. We had a discussion
> yesterday,
> >>>> and
> >>>>>>> most of feedbacks have been agreed upon. Here is the conclusions:
> >>>>>>>
> >>>>>>> *1. about proposed methods in `TableEnvironment`:*
> >>>>>>>
> >>>>>>> the original proposed methods:
> >>>>>>>
> >>>>>>> TableEnvironment.createDmlBatch(): DmlBatch
> >>>>>>> TableEnvironment.executeStatement(String statement): ResultTable
> >>>>>>>
> >>>>>>> the new proposed methods:
> >>>>>>>
> >>>>>>> // we should not use abbreviations in the API, and the term "Batch"
> >> is
> >>>>>>> easily confused with batch/streaming processing
> >>>>>>> TableEnvironment.createStatementSet(): StatementSet
> >>>>>>>
> >>>>>>> // every method that takes SQL should have `Sql` in its name
> >>>>>>> // supports multiline statement ???
> >>>>>>> TableEnvironment.executeSql(String statement): TableResult
> >>>>>>>
> >>>>>>> // new methods. supports explaining DQL and DML
> >>>>>>> TableEnvironment.explainSql(String statement, ExplainDetail...
> >>>> details):
> >>>>>>> String
> >>>>>>>
> >>>>>>>
> >>>>>>> *2. about proposed related classes:*
> >>>>>>>
> >>>>>>> the original proposed classes:
> >>>>>>>
> >>>>>>> interface DmlBatch {
> >>>>>>>         void addInsert(String insert);
> >>>>>>>         void addInsert(String targetPath, Table table);
> >>>>>>>         ResultTable execute() throws Exception ;
> >>>>>>>         String explain(boolean extended);
> >>>>>>> }
> >>>>>>>
> >>>>>>> public interface ResultTable {
> >>>>>>>         TableSchema getResultSchema();
> >>>>>>>         Iterable<Row> getResultRows();
> >>>>>>> }
> >>>>>>>
> >>>>>>> the new proposed classes:
> >>>>>>>
> >>>>>>> interface StatementSet {
> >>>>>>>         // every method that takes SQL should have `Sql` in its
> name
> >>>>>>>         // return StatementSet instance for fluent programming
> >>>>>>>         addInsertSql(String statement): StatementSet
> >>>>>>>
> >>>>>>>         // return StatementSet instance for fluent programming
> >>>>>>>         addInsert(String tablePath, Table table): StatementSet
> >>>>>>>
> >>>>>>>         // new method. support overwrite mode
> >>>>>>>         addInsert(String tablePath, Table table, boolean
> overwrite):
> >>>>>>> StatementSet
> >>>>>>>
> >>>>>>>         explain(): String
> >>>>>>>
> >>>>>>>         // new method. supports adding more details for the result
> >>>>>>>         explain(ExplainDetail... extraDetails): String
> >>>>>>>
> >>>>>>>         // throw exception ???
> >>>>>>>         execute(): TableResult
> >>>>>>> }
> >>>>>>>
> >>>>>>> interface TableResult {
> >>>>>>>         getTableSchema(): TableSchema
> >>>>>>>
> >>>>>>>         // avoid custom parsing of an "OK" row in programming
> >>>>>>>         getResultKind(): ResultKind
> >>>>>>>
> >>>>>>>         // instead of `get` make it explicit that this is might be
> >>>>>> triggering
> >>>>>>> an expensive operation
> >>>>>>>         collect(): Iterable<Row>
> >>>>>>>
> >>>>>>>         // for fluent programming
> >>>>>>>         print(): Unit
> >>>>>>> }
> >>>>>>>
> >>>>>>> enum ResultKind {
> >>>>>>>         SUCCESS, // for DDL, DCL and statements with a simple "OK"
> >>>>>>>         SUCCESS_WITH_CONTENT, // rows with important content are
> >>>> available
> >>>>>>> (DML, DQL)
> >>>>>>> }
> >>>>>>>
> >>>>>>>
> >>>>>>> *3. new proposed methods in `Table`*
> >>>>>>>
> >>>>>>> `Table.insertInto()` will be deprecated, and the following methods
> >> are
> >>>>>>> introduced:
> >>>>>>>
> >>>>>>> Table.executeInsert(String tablePath): TableResult
> >>>>>>> Table.executeInsert(String tablePath, boolean overwrite):
> TableResult
> >>>>>>> Table.explain(ExplainDetail... details): String
> >>>>>>> Table.execute(): TableResult
> >>>>>>>
> >>>>>>> There are two issues need further discussion, one is whether
> >>>>>>> `TableEnvironment.executeSql(String statement): TableResult` needs
> to
> >>>>>>> support multiline statement (or whether `TableEnvironment` needs to
> >>>>>> support
> >>>>>>> multiline statement), and another one is whether
> >>>> `StatementSet.execute()`
> >>>>>>> needs to throw exception.
> >>>>>>>
> >>>>>>> please refer to the feedback document [2] for the details.
> >>>>>>>
> >>>>>>> Any suggestions are warmly welcomed!
> >>>>>>>
> >>>>>>> [1]
> >>>>>>>
> >>>>>>
> >>>>
> >>
> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >>>>>>> [2]
> >>>>>>>
> >>>>>>
> >>>>
> >>
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Godfrey
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

godfreyhe
Hi, Timo & Jark

Thanks for your explanation.
Agree with you that async execution should always be async,
and sync execution scenario can be covered  by async execution.
It helps provide an unified entry point for batch and streaming.
I think we can also use sync execution for some testing.
So, I agree with you that we provide `executeSql` method and it's async
method.
If we want sync method in the future, we can add method named
`executeSqlSync`.

I think we've reached an agreement. I will update the document, and start
voting process.

Best,
Godfrey


Jark Wu <[hidden email]> 于2020年3月31日周二 上午12:46写道:

> Hi,
>
> I didn't follow the full discussion.
> But I share the same concern with Timo that streaming queries should always
> be async.
> Otherwise, I can image it will cause a lot of confusion and problems if
> users don't deeply keep the "sync" in mind (e.g. client hangs).
> Besides, the streaming mode is still the majority use cases of Flink and
> Flink SQL. We should put the usability at a high priority.
>
> Best,
> Jark
>
>
> On Mon, 30 Mar 2020 at 23:27, Timo Walther <[hidden email]> wrote:
>
> > Hi Godfrey,
> >
> > maybe I wasn't expressing my biggest concern enough in my last mail.
> > Even in a singleline and sync execution, I think that streaming queries
> > should not block the execution. Otherwise it is not possible to call
> > collect() or print() on them afterwards.
> >
> > "there are too many things need to discuss for multiline":
> >
> > True, I don't want to solve all of them right now. But what I know is
> > that our newly introduced methods should fit into a multiline execution.
> > There is no big difference of calling `executeSql(A), executeSql(B)` and
> > processing a multiline file `A;\nB;`.
> >
> > I think the example that you mentioned can simply be undefined for now.
> > Currently, no catalog is modifying data but just metadata. This is a
> > separate discussion.
> >
> > "result of the second statement is indeterministic":
> >
> > Sure this is indeterministic. But this is the implementers fault and we
> > cannot forbid such pipelines.
> >
> > How about we always execute streaming queries async? It would unblock
> > executeSql() and multiline statements.
> >
> > Having a `executeSqlAsync()` is useful for batch. However, I don't want
> > `sync/async` be the new batch/stream flag. The execution behavior should
> > come from the query itself.
> >
> > Regards,
> > Timo
> >
> >
> > On 30.03.20 11:12, godfrey he wrote:
> > > Hi Timo,
> > >
> > > Agree with you that streaming queries is our top priority,
> > > but I think there are too many things need to discuss for multiline
> > > statements:
> > > e.g.
> > > 1. what's the behaivor of DDL and DML mixing for async execution:
> > > create table t1 xxx;
> > > create table t2 xxx;
> > > insert into t2 select * from t1 where xxx;
> > > drop table t1; // t1 may be a MySQL table, the data will also be
> deleted.
> > >
> > > t1 is dropped when "insert" job is running.
> > >
> > > 2. what's the behaivor of unified scenario for async execution: (as you
> > > mentioned)
> > > INSERT INTO t1 SELECT * FROM s;
> > > INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> > >
> > > The result of the second statement is indeterministic, because the
> first
> > > statement maybe is running.
> > > I think we need to put a lot of effort to define the behavior of
> > logically
> > > related queries.
> > >
> > > In this FLIP, I suggest we only handle single statement, and we also
> > > introduce an async execute method
> > > which is more important and more often used for users.
> > >
> > > Dor the sync methods (like `TableEnvironment.executeSql` and
> > > `StatementSet.execute`),
> > > the result will be returned until the job is finished. The following
> > > methods will be introduced in this FLIP:
> > >
> > >   /**
> > >    * Asynchronously execute the given single statement
> > >    */
> > > TableEnvironment.executeSqlAsync(String statement): TableResult
> > >
> > > /**
> > >   * Asynchronously execute the dml statements as a batch
> > >   */
> > > StatementSet.executeAsync(): TableResult
> > >
> > > public interface TableResult {
> > >     /**
> > >      * return JobClient for DQL and DML in async mode, else return
> > > Optional.empty
> > >      */
> > >     Optional<JobClient> getJobClient();
> > > }
> > >
> > > what do you think?
> > >
> > > Best,
> > > Godfrey
> > >
> > > Timo Walther <[hidden email]> 于2020年3月26日周四 下午9:15写道:
> > >
> > >> Hi Godfrey,
> > >>
> > >> executing streaming queries must be our top priority because this is
> > >> what distinguishes Flink from competitors. If we change the execution
> > >> behavior, we should think about the other cases as well to not break
> the
> > >> API a third time.
> > >>
> > >> I fear that just having an async execute method will not be enough
> > >> because users should be able to mix streaming and batch queries in a
> > >> unified scenario.
> > >>
> > >> If I remember it correctly, we had some discussions in the past about
> > >> what decides about the execution mode of a query. Currently, we would
> > >> like to let the query decide, not derive it from the sources.
> > >>
> > >> So I could image a multiline pipeline as:
> > >>
> > >> USE CATALOG 'mycat';
> > >> INSERT INTO t1 SELECT * FROM s;
> > >> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> > >>
> > >> For executeMultilineSql():
> > >>
> > >> sync because regular SQL
> > >> sync because regular Batch SQL
> > >> async because Streaming SQL
> > >>
> > >> For executeAsyncMultilineSql():
> > >>
> > >> async because everything should be async
> > >> async because everything should be async
> > >> async because everything should be async
> > >>
> > >> What we should not start for executeAsyncMultilineSql():
> > >>
> > >> sync because DDL
> > >> async because everything should be async
> > >> async because everything should be async
> > >>
> > >> What are you thoughts here?
> > >>
> > >> Regards,
> > >> Timo
> > >>
> > >>
> > >> On 26.03.20 12:50, godfrey he wrote:
> > >>> Hi Timo,
> > >>>
> > >>> I agree with you that streaming queries mostly need async execution.
> > >>> In fact, our original plan is only introducing sync methods in this
> > FLIP,
> > >>> and async methods (like "executeSqlAsync") will be introduced in the
> > >> future
> > >>> which is mentioned in the appendix.
> > >>>
> > >>> Maybe the async methods also need to be considered in this FLIP.
> > >>>
> > >>> I think sync methods is also useful for streaming which can be used
> to
> > >> run
> > >>> bounded source.
> > >>> Maybe we should check whether all sources are bounded in sync
> execution
> > >>> mode.
> > >>>
> > >>>> Also, if we block for streaming queries, we could never support
> > >>>> multiline files. Because the first INSERT INTO would block the
> further
> > >>>> execution.
> > >>> agree with you, we need async method to submit multiline files,
> > >>> and files should be limited that the DQL and DML should be always in
> > the
> > >>> end for streaming.
> > >>>
> > >>> Best,
> > >>> Godfrey
> > >>>
> > >>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午4:29写道:
> > >>>
> > >>>> Hi Godfrey,
> > >>>>
> > >>>> having control over the job after submission is a requirement that
> was
> > >>>> requested frequently (some examples are [1], [2]). Users would like
> to
> > >>>> get insights about the running or completed job. Including the
> jobId,
> > >>>> jobGraph etc., the JobClient summarizes these properties.
> > >>>>
> > >>>> It is good to have a discussion about synchronous/asynchronous
> > >>>> submission now to have a complete execution picture.
> > >>>>
> > >>>> I thought we submit streaming queries mostly async and just wait for
> > the
> > >>>> successful submission. If we block for streaming queries, how can we
> > >>>> collect() or print() results?
> > >>>>
> > >>>> Also, if we block for streaming queries, we could never support
> > >>>> multiline files. Because the first INSERT INTO would block the
> further
> > >>>> execution.
> > >>>>
> > >>>> If we decide to block entirely on streaming queries, we need the
> async
> > >>>> execution methods in the design already. However, I would rather go
> > for
> > >>>> non-blocking streaming queries. Also with the `EMIT STREAM` key word
> > in
> > >>>> mind that we might add to SQL statements soon.
> > >>>>
> > >>>> Regards,
> > >>>> Timo
> > >>>>
> > >>>> [1] https://issues.apache.org/jira/browse/FLINK-16761
> > >>>> [2] https://issues.apache.org/jira/browse/FLINK-12214
> > >>>>
> > >>>> On 25.03.20 16:30, godfrey he wrote:
> > >>>>> Hi Timo,
> > >>>>>
> > >>>>> Thanks for the updating.
> > >>>>>
> > >>>>> Regarding to "multiline statement support", I'm also fine that
> > >>>>> `TableEnvironment.executeSql()` only supports single line
> statement,
> > >> and
> > >>>> we
> > >>>>> can support multiline statement later (needs more discussion about
> > >> this).
> > >>>>>
> > >>>>> Regarding to "StatementSet.explian()", I don't have strong opinions
> > >> about
> > >>>>> that.
> > >>>>>
> > >>>>> Regarding to "TableResult.getJobClient()", I think it's
> unnecessary.
> > >> The
> > >>>>> reason is: first, many statements (e.g. DDL, show xx, use xx)  will
> > not
> > >>>>> submit a Flink job. second, `TableEnvironment.executeSql()` and
> > >>>>> `StatementSet.execute()` are synchronous method, `TableResult` will
> > be
> > >>>>> returned only after the job is finished or failed.
> > >>>>>
> > >>>>> Regarding to "whether StatementSet.execute() needs to throw
> > >> exception", I
> > >>>>> think we should choose a unified way to tell whether the execution
> is
> > >>>>> successful. If `TableResult` contains ERROR kind (non-runtime
> > >> exception),
> > >>>>> users need to not only check the result but also catch the runtime
> > >>>>> exception in their code. or `StatementSet.execute()` does not throw
> > any
> > >>>>> exception (including runtime exception), all exception messages are
> > in
> > >>>> the
> > >>>>> result.  I prefer "StatementSet.execute() needs to throw
> exception".
> > cc
> > >>>> @Jark
> > >>>>> Wu <[hidden email]>
> > >>>>>
> > >>>>> I will update the agreed parts to the document first.
> > >>>>>
> > >>>>> Best,
> > >>>>> Godfrey
> > >>>>>
> > >>>>>
> > >>>>> Timo Walther <[hidden email]> 于2020年3月25日周三 下午6:51写道:
> > >>>>>
> > >>>>>> Hi Godfrey,
> > >>>>>>
> > >>>>>> thanks for starting the discussion on the mailing list. And sorry
> > >> again
> > >>>>>> for the late reply to FLIP-84. I have updated the Google doc one
> > more
> > >>>>>> time to incorporate the offline discussions.
> > >>>>>>
> > >>>>>>     From Dawid's and my view, it is fine to postpone the multiline
> > >> support
> > >>>>>> to a separate method. This can be future work even though we will
> > need
> > >>>>>> it rather soon.
> > >>>>>>
> > >>>>>> If there are no objections, I suggest to update the FLIP-84 again
> > and
> > >>>>>> have another voting process.
> > >>>>>>
> > >>>>>> Thanks,
> > >>>>>> Timo
> > >>>>>>
> > >>>>>>
> > >>>>>> On 25.03.20 11:17, godfrey he wrote:
> > >>>>>>> Hi community,
> > >>>>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
> > >>>>>> feedbacks
> > >>>>>>> are all about new introduced methods. We had a discussion
> > yesterday,
> > >>>> and
> > >>>>>>> most of feedbacks have been agreed upon. Here is the conclusions:
> > >>>>>>>
> > >>>>>>> *1. about proposed methods in `TableEnvironment`:*
> > >>>>>>>
> > >>>>>>> the original proposed methods:
> > >>>>>>>
> > >>>>>>> TableEnvironment.createDmlBatch(): DmlBatch
> > >>>>>>> TableEnvironment.executeStatement(String statement): ResultTable
> > >>>>>>>
> > >>>>>>> the new proposed methods:
> > >>>>>>>
> > >>>>>>> // we should not use abbreviations in the API, and the term
> "Batch"
> > >> is
> > >>>>>>> easily confused with batch/streaming processing
> > >>>>>>> TableEnvironment.createStatementSet(): StatementSet
> > >>>>>>>
> > >>>>>>> // every method that takes SQL should have `Sql` in its name
> > >>>>>>> // supports multiline statement ???
> > >>>>>>> TableEnvironment.executeSql(String statement): TableResult
> > >>>>>>>
> > >>>>>>> // new methods. supports explaining DQL and DML
> > >>>>>>> TableEnvironment.explainSql(String statement, ExplainDetail...
> > >>>> details):
> > >>>>>>> String
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> *2. about proposed related classes:*
> > >>>>>>>
> > >>>>>>> the original proposed classes:
> > >>>>>>>
> > >>>>>>> interface DmlBatch {
> > >>>>>>>         void addInsert(String insert);
> > >>>>>>>         void addInsert(String targetPath, Table table);
> > >>>>>>>         ResultTable execute() throws Exception ;
> > >>>>>>>         String explain(boolean extended);
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> public interface ResultTable {
> > >>>>>>>         TableSchema getResultSchema();
> > >>>>>>>         Iterable<Row> getResultRows();
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> the new proposed classes:
> > >>>>>>>
> > >>>>>>> interface StatementSet {
> > >>>>>>>         // every method that takes SQL should have `Sql` in its
> > name
> > >>>>>>>         // return StatementSet instance for fluent programming
> > >>>>>>>         addInsertSql(String statement): StatementSet
> > >>>>>>>
> > >>>>>>>         // return StatementSet instance for fluent programming
> > >>>>>>>         addInsert(String tablePath, Table table): StatementSet
> > >>>>>>>
> > >>>>>>>         // new method. support overwrite mode
> > >>>>>>>         addInsert(String tablePath, Table table, boolean
> > overwrite):
> > >>>>>>> StatementSet
> > >>>>>>>
> > >>>>>>>         explain(): String
> > >>>>>>>
> > >>>>>>>         // new method. supports adding more details for the
> result
> > >>>>>>>         explain(ExplainDetail... extraDetails): String
> > >>>>>>>
> > >>>>>>>         // throw exception ???
> > >>>>>>>         execute(): TableResult
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> interface TableResult {
> > >>>>>>>         getTableSchema(): TableSchema
> > >>>>>>>
> > >>>>>>>         // avoid custom parsing of an "OK" row in programming
> > >>>>>>>         getResultKind(): ResultKind
> > >>>>>>>
> > >>>>>>>         // instead of `get` make it explicit that this is might
> be
> > >>>>>> triggering
> > >>>>>>> an expensive operation
> > >>>>>>>         collect(): Iterable<Row>
> > >>>>>>>
> > >>>>>>>         // for fluent programming
> > >>>>>>>         print(): Unit
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>> enum ResultKind {
> > >>>>>>>         SUCCESS, // for DDL, DCL and statements with a simple
> "OK"
> > >>>>>>>         SUCCESS_WITH_CONTENT, // rows with important content are
> > >>>> available
> > >>>>>>> (DML, DQL)
> > >>>>>>> }
> > >>>>>>>
> > >>>>>>>
> > >>>>>>> *3. new proposed methods in `Table`*
> > >>>>>>>
> > >>>>>>> `Table.insertInto()` will be deprecated, and the following
> methods
> > >> are
> > >>>>>>> introduced:
> > >>>>>>>
> > >>>>>>> Table.executeInsert(String tablePath): TableResult
> > >>>>>>> Table.executeInsert(String tablePath, boolean overwrite):
> > TableResult
> > >>>>>>> Table.explain(ExplainDetail... details): String
> > >>>>>>> Table.execute(): TableResult
> > >>>>>>>
> > >>>>>>> There are two issues need further discussion, one is whether
> > >>>>>>> `TableEnvironment.executeSql(String statement): TableResult`
> needs
> > to
> > >>>>>>> support multiline statement (or whether `TableEnvironment` needs
> to
> > >>>>>> support
> > >>>>>>> multiline statement), and another one is whether
> > >>>> `StatementSet.execute()`
> > >>>>>>> needs to throw exception.
> > >>>>>>>
> > >>>>>>> please refer to the feedback document [2] for the details.
> > >>>>>>>
> > >>>>>>> Any suggestions are warmly welcomed!
> > >>>>>>>
> > >>>>>>> [1]
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>
> >
> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> > >>>>>>> [2]
> > >>>>>>>
> > >>>>>>
> > >>>>
> > >>
> >
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
> > >>>>>>>
> > >>>>>>> Best,
> > >>>>>>> Godfrey
> > >>>>>>>
> > >>>>>>
> > >>>>>>
> > >>>>>
> > >>>>
> > >>>>
> > >>>
> > >>
> > >>
> > >
> >
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

Timo Walther-2
Hi Godfrey,

Aljoscha, Dawid, Klou, and I had another discussion around FLIP-84. In
particular, we discussed how the current status of the FLIP and the
future requirements around multiline statements, async/sync, collect()
fit together.

We also updated the FLIP-84 Feedback Summary document [1] with some use
cases.

We believe that we found a good solution that also fits to what is in
the current FLIP. So no bigger changes necessary, which is great!

Our findings were:

1. Async vs sync submission of Flink jobs:

Having a blocking `execute()` in DataStream API was rather a mistake.
Instead all submissions should be async because this allows supporting
both modes if necessary. Thus, submitting all queries async sounds good
to us. If users want to run a job sync, they can use the JobClient and
wait for completion (or collect() in case of batch jobs).

2. Multi-statement execution:

For the multi-statement execution, we don't see a contradication with
the async execution behavior. We imagine a method like:

TableEnvironment#executeMultilineSql(String statements):
Iterable<TableResult>

Where the `Iterator#next()` method would trigger the next statement
submission. This allows a caller to decide synchronously when to submit
statements async to the cluster. Thus, a service such as the SQL Client
can handle the result of each statement individually and process
statement by statement sequentially.

3. The role of TableResult and result retrieval in general

`TableResult` is similar to `JobClient`. Instead of returning a
`CompletableFuture` of something, it is a concrete util class where some
methods have the behavior of completable future (e.g. collect(),
print()) and some are already completed (getTableSchema(), getResultKind()).

`StatementSet#execute()` returns a single `TableResult` because the
order is undefined in a set and all statements have the same schema. Its
`collect()` will return a row for each executed `INSERT INTO` in the
order of statement definition.

For simple `SELECT * FROM ...`, the query execution might block until
`collect()` is called to pull buffered rows from the job (from
socket/REST API what ever we will use in the future). We can say that a
statement finished successfully, when the `collect#Iterator#hasNext` has
returned false.

I hope this summarizes our discussion @Dawid/Aljoscha/Klou?

It would be great if we can add these findings to the FLIP before we
start voting.

One minor thing: some `execute()` methods still throw a checked
exception; can we remove that from the FLIP? Also the above mentioned
`Iterator#next()` would trigger an execution without throwing a checked
exception.

Thanks,
Timo

[1]
https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit#

On 31.03.20 06:28, godfrey he wrote:

> Hi, Timo & Jark
>
> Thanks for your explanation.
> Agree with you that async execution should always be async,
> and sync execution scenario can be covered  by async execution.
> It helps provide an unified entry point for batch and streaming.
> I think we can also use sync execution for some testing.
> So, I agree with you that we provide `executeSql` method and it's async
> method.
> If we want sync method in the future, we can add method named
> `executeSqlSync`.
>
> I think we've reached an agreement. I will update the document, and start
> voting process.
>
> Best,
> Godfrey
>
>
> Jark Wu <[hidden email]> 于2020年3月31日周二 上午12:46写道:
>
>> Hi,
>>
>> I didn't follow the full discussion.
>> But I share the same concern with Timo that streaming queries should always
>> be async.
>> Otherwise, I can image it will cause a lot of confusion and problems if
>> users don't deeply keep the "sync" in mind (e.g. client hangs).
>> Besides, the streaming mode is still the majority use cases of Flink and
>> Flink SQL. We should put the usability at a high priority.
>>
>> Best,
>> Jark
>>
>>
>> On Mon, 30 Mar 2020 at 23:27, Timo Walther <[hidden email]> wrote:
>>
>>> Hi Godfrey,
>>>
>>> maybe I wasn't expressing my biggest concern enough in my last mail.
>>> Even in a singleline and sync execution, I think that streaming queries
>>> should not block the execution. Otherwise it is not possible to call
>>> collect() or print() on them afterwards.
>>>
>>> "there are too many things need to discuss for multiline":
>>>
>>> True, I don't want to solve all of them right now. But what I know is
>>> that our newly introduced methods should fit into a multiline execution.
>>> There is no big difference of calling `executeSql(A), executeSql(B)` and
>>> processing a multiline file `A;\nB;`.
>>>
>>> I think the example that you mentioned can simply be undefined for now.
>>> Currently, no catalog is modifying data but just metadata. This is a
>>> separate discussion.
>>>
>>> "result of the second statement is indeterministic":
>>>
>>> Sure this is indeterministic. But this is the implementers fault and we
>>> cannot forbid such pipelines.
>>>
>>> How about we always execute streaming queries async? It would unblock
>>> executeSql() and multiline statements.
>>>
>>> Having a `executeSqlAsync()` is useful for batch. However, I don't want
>>> `sync/async` be the new batch/stream flag. The execution behavior should
>>> come from the query itself.
>>>
>>> Regards,
>>> Timo
>>>
>>>
>>> On 30.03.20 11:12, godfrey he wrote:
>>>> Hi Timo,
>>>>
>>>> Agree with you that streaming queries is our top priority,
>>>> but I think there are too many things need to discuss for multiline
>>>> statements:
>>>> e.g.
>>>> 1. what's the behaivor of DDL and DML mixing for async execution:
>>>> create table t1 xxx;
>>>> create table t2 xxx;
>>>> insert into t2 select * from t1 where xxx;
>>>> drop table t1; // t1 may be a MySQL table, the data will also be
>> deleted.
>>>>
>>>> t1 is dropped when "insert" job is running.
>>>>
>>>> 2. what's the behaivor of unified scenario for async execution: (as you
>>>> mentioned)
>>>> INSERT INTO t1 SELECT * FROM s;
>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
>>>>
>>>> The result of the second statement is indeterministic, because the
>> first
>>>> statement maybe is running.
>>>> I think we need to put a lot of effort to define the behavior of
>>> logically
>>>> related queries.
>>>>
>>>> In this FLIP, I suggest we only handle single statement, and we also
>>>> introduce an async execute method
>>>> which is more important and more often used for users.
>>>>
>>>> Dor the sync methods (like `TableEnvironment.executeSql` and
>>>> `StatementSet.execute`),
>>>> the result will be returned until the job is finished. The following
>>>> methods will be introduced in this FLIP:
>>>>
>>>>    /**
>>>>     * Asynchronously execute the given single statement
>>>>     */
>>>> TableEnvironment.executeSqlAsync(String statement): TableResult
>>>>
>>>> /**
>>>>    * Asynchronously execute the dml statements as a batch
>>>>    */
>>>> StatementSet.executeAsync(): TableResult
>>>>
>>>> public interface TableResult {
>>>>      /**
>>>>       * return JobClient for DQL and DML in async mode, else return
>>>> Optional.empty
>>>>       */
>>>>      Optional<JobClient> getJobClient();
>>>> }
>>>>
>>>> what do you think?
>>>>
>>>> Best,
>>>> Godfrey
>>>>
>>>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午9:15写道:
>>>>
>>>>> Hi Godfrey,
>>>>>
>>>>> executing streaming queries must be our top priority because this is
>>>>> what distinguishes Flink from competitors. If we change the execution
>>>>> behavior, we should think about the other cases as well to not break
>> the
>>>>> API a third time.
>>>>>
>>>>> I fear that just having an async execute method will not be enough
>>>>> because users should be able to mix streaming and batch queries in a
>>>>> unified scenario.
>>>>>
>>>>> If I remember it correctly, we had some discussions in the past about
>>>>> what decides about the execution mode of a query. Currently, we would
>>>>> like to let the query decide, not derive it from the sources.
>>>>>
>>>>> So I could image a multiline pipeline as:
>>>>>
>>>>> USE CATALOG 'mycat';
>>>>> INSERT INTO t1 SELECT * FROM s;
>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
>>>>>
>>>>> For executeMultilineSql():
>>>>>
>>>>> sync because regular SQL
>>>>> sync because regular Batch SQL
>>>>> async because Streaming SQL
>>>>>
>>>>> For executeAsyncMultilineSql():
>>>>>
>>>>> async because everything should be async
>>>>> async because everything should be async
>>>>> async because everything should be async
>>>>>
>>>>> What we should not start for executeAsyncMultilineSql():
>>>>>
>>>>> sync because DDL
>>>>> async because everything should be async
>>>>> async because everything should be async
>>>>>
>>>>> What are you thoughts here?
>>>>>
>>>>> Regards,
>>>>> Timo
>>>>>
>>>>>
>>>>> On 26.03.20 12:50, godfrey he wrote:
>>>>>> Hi Timo,
>>>>>>
>>>>>> I agree with you that streaming queries mostly need async execution.
>>>>>> In fact, our original plan is only introducing sync methods in this
>>> FLIP,
>>>>>> and async methods (like "executeSqlAsync") will be introduced in the
>>>>> future
>>>>>> which is mentioned in the appendix.
>>>>>>
>>>>>> Maybe the async methods also need to be considered in this FLIP.
>>>>>>
>>>>>> I think sync methods is also useful for streaming which can be used
>> to
>>>>> run
>>>>>> bounded source.
>>>>>> Maybe we should check whether all sources are bounded in sync
>> execution
>>>>>> mode.
>>>>>>
>>>>>>> Also, if we block for streaming queries, we could never support
>>>>>>> multiline files. Because the first INSERT INTO would block the
>> further
>>>>>>> execution.
>>>>>> agree with you, we need async method to submit multiline files,
>>>>>> and files should be limited that the DQL and DML should be always in
>>> the
>>>>>> end for streaming.
>>>>>>
>>>>>> Best,
>>>>>> Godfrey
>>>>>>
>>>>>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午4:29写道:
>>>>>>
>>>>>>> Hi Godfrey,
>>>>>>>
>>>>>>> having control over the job after submission is a requirement that
>> was
>>>>>>> requested frequently (some examples are [1], [2]). Users would like
>> to
>>>>>>> get insights about the running or completed job. Including the
>> jobId,
>>>>>>> jobGraph etc., the JobClient summarizes these properties.
>>>>>>>
>>>>>>> It is good to have a discussion about synchronous/asynchronous
>>>>>>> submission now to have a complete execution picture.
>>>>>>>
>>>>>>> I thought we submit streaming queries mostly async and just wait for
>>> the
>>>>>>> successful submission. If we block for streaming queries, how can we
>>>>>>> collect() or print() results?
>>>>>>>
>>>>>>> Also, if we block for streaming queries, we could never support
>>>>>>> multiline files. Because the first INSERT INTO would block the
>> further
>>>>>>> execution.
>>>>>>>
>>>>>>> If we decide to block entirely on streaming queries, we need the
>> async
>>>>>>> execution methods in the design already. However, I would rather go
>>> for
>>>>>>> non-blocking streaming queries. Also with the `EMIT STREAM` key word
>>> in
>>>>>>> mind that we might add to SQL statements soon.
>>>>>>>
>>>>>>> Regards,
>>>>>>> Timo
>>>>>>>
>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-16761
>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-12214
>>>>>>>
>>>>>>> On 25.03.20 16:30, godfrey he wrote:
>>>>>>>> Hi Timo,
>>>>>>>>
>>>>>>>> Thanks for the updating.
>>>>>>>>
>>>>>>>> Regarding to "multiline statement support", I'm also fine that
>>>>>>>> `TableEnvironment.executeSql()` only supports single line
>> statement,
>>>>> and
>>>>>>> we
>>>>>>>> can support multiline statement later (needs more discussion about
>>>>> this).
>>>>>>>>
>>>>>>>> Regarding to "StatementSet.explian()", I don't have strong opinions
>>>>> about
>>>>>>>> that.
>>>>>>>>
>>>>>>>> Regarding to "TableResult.getJobClient()", I think it's
>> unnecessary.
>>>>> The
>>>>>>>> reason is: first, many statements (e.g. DDL, show xx, use xx)  will
>>> not
>>>>>>>> submit a Flink job. second, `TableEnvironment.executeSql()` and
>>>>>>>> `StatementSet.execute()` are synchronous method, `TableResult` will
>>> be
>>>>>>>> returned only after the job is finished or failed.
>>>>>>>>
>>>>>>>> Regarding to "whether StatementSet.execute() needs to throw
>>>>> exception", I
>>>>>>>> think we should choose a unified way to tell whether the execution
>> is
>>>>>>>> successful. If `TableResult` contains ERROR kind (non-runtime
>>>>> exception),
>>>>>>>> users need to not only check the result but also catch the runtime
>>>>>>>> exception in their code. or `StatementSet.execute()` does not throw
>>> any
>>>>>>>> exception (including runtime exception), all exception messages are
>>> in
>>>>>>> the
>>>>>>>> result.  I prefer "StatementSet.execute() needs to throw
>> exception".
>>> cc
>>>>>>> @Jark
>>>>>>>> Wu <[hidden email]>
>>>>>>>>
>>>>>>>> I will update the agreed parts to the document first.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Godfrey
>>>>>>>>
>>>>>>>>
>>>>>>>> Timo Walther <[hidden email]> 于2020年3月25日周三 下午6:51写道:
>>>>>>>>
>>>>>>>>> Hi Godfrey,
>>>>>>>>>
>>>>>>>>> thanks for starting the discussion on the mailing list. And sorry
>>>>> again
>>>>>>>>> for the late reply to FLIP-84. I have updated the Google doc one
>>> more
>>>>>>>>> time to incorporate the offline discussions.
>>>>>>>>>
>>>>>>>>>      From Dawid's and my view, it is fine to postpone the multiline
>>>>> support
>>>>>>>>> to a separate method. This can be future work even though we will
>>> need
>>>>>>>>> it rather soon.
>>>>>>>>>
>>>>>>>>> If there are no objections, I suggest to update the FLIP-84 again
>>> and
>>>>>>>>> have another voting process.
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Timo
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 25.03.20 11:17, godfrey he wrote:
>>>>>>>>>> Hi community,
>>>>>>>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
>>>>>>>>> feedbacks
>>>>>>>>>> are all about new introduced methods. We had a discussion
>>> yesterday,
>>>>>>> and
>>>>>>>>>> most of feedbacks have been agreed upon. Here is the conclusions:
>>>>>>>>>>
>>>>>>>>>> *1. about proposed methods in `TableEnvironment`:*
>>>>>>>>>>
>>>>>>>>>> the original proposed methods:
>>>>>>>>>>
>>>>>>>>>> TableEnvironment.createDmlBatch(): DmlBatch
>>>>>>>>>> TableEnvironment.executeStatement(String statement): ResultTable
>>>>>>>>>>
>>>>>>>>>> the new proposed methods:
>>>>>>>>>>
>>>>>>>>>> // we should not use abbreviations in the API, and the term
>> "Batch"
>>>>> is
>>>>>>>>>> easily confused with batch/streaming processing
>>>>>>>>>> TableEnvironment.createStatementSet(): StatementSet
>>>>>>>>>>
>>>>>>>>>> // every method that takes SQL should have `Sql` in its name
>>>>>>>>>> // supports multiline statement ???
>>>>>>>>>> TableEnvironment.executeSql(String statement): TableResult
>>>>>>>>>>
>>>>>>>>>> // new methods. supports explaining DQL and DML
>>>>>>>>>> TableEnvironment.explainSql(String statement, ExplainDetail...
>>>>>>> details):
>>>>>>>>>> String
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *2. about proposed related classes:*
>>>>>>>>>>
>>>>>>>>>> the original proposed classes:
>>>>>>>>>>
>>>>>>>>>> interface DmlBatch {
>>>>>>>>>>          void addInsert(String insert);
>>>>>>>>>>          void addInsert(String targetPath, Table table);
>>>>>>>>>>          ResultTable execute() throws Exception ;
>>>>>>>>>>          String explain(boolean extended);
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> public interface ResultTable {
>>>>>>>>>>          TableSchema getResultSchema();
>>>>>>>>>>          Iterable<Row> getResultRows();
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> the new proposed classes:
>>>>>>>>>>
>>>>>>>>>> interface StatementSet {
>>>>>>>>>>          // every method that takes SQL should have `Sql` in its
>>> name
>>>>>>>>>>          // return StatementSet instance for fluent programming
>>>>>>>>>>          addInsertSql(String statement): StatementSet
>>>>>>>>>>
>>>>>>>>>>          // return StatementSet instance for fluent programming
>>>>>>>>>>          addInsert(String tablePath, Table table): StatementSet
>>>>>>>>>>
>>>>>>>>>>          // new method. support overwrite mode
>>>>>>>>>>          addInsert(String tablePath, Table table, boolean
>>> overwrite):
>>>>>>>>>> StatementSet
>>>>>>>>>>
>>>>>>>>>>          explain(): String
>>>>>>>>>>
>>>>>>>>>>          // new method. supports adding more details for the
>> result
>>>>>>>>>>          explain(ExplainDetail... extraDetails): String
>>>>>>>>>>
>>>>>>>>>>          // throw exception ???
>>>>>>>>>>          execute(): TableResult
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> interface TableResult {
>>>>>>>>>>          getTableSchema(): TableSchema
>>>>>>>>>>
>>>>>>>>>>          // avoid custom parsing of an "OK" row in programming
>>>>>>>>>>          getResultKind(): ResultKind
>>>>>>>>>>
>>>>>>>>>>          // instead of `get` make it explicit that this is might
>> be
>>>>>>>>> triggering
>>>>>>>>>> an expensive operation
>>>>>>>>>>          collect(): Iterable<Row>
>>>>>>>>>>
>>>>>>>>>>          // for fluent programming
>>>>>>>>>>          print(): Unit
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> enum ResultKind {
>>>>>>>>>>          SUCCESS, // for DDL, DCL and statements with a simple
>> "OK"
>>>>>>>>>>          SUCCESS_WITH_CONTENT, // rows with important content are
>>>>>>> available
>>>>>>>>>> (DML, DQL)
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *3. new proposed methods in `Table`*
>>>>>>>>>>
>>>>>>>>>> `Table.insertInto()` will be deprecated, and the following
>> methods
>>>>> are
>>>>>>>>>> introduced:
>>>>>>>>>>
>>>>>>>>>> Table.executeInsert(String tablePath): TableResult
>>>>>>>>>> Table.executeInsert(String tablePath, boolean overwrite):
>>> TableResult
>>>>>>>>>> Table.explain(ExplainDetail... details): String
>>>>>>>>>> Table.execute(): TableResult
>>>>>>>>>>
>>>>>>>>>> There are two issues need further discussion, one is whether
>>>>>>>>>> `TableEnvironment.executeSql(String statement): TableResult`
>> needs
>>> to
>>>>>>>>>> support multiline statement (or whether `TableEnvironment` needs
>> to
>>>>>>>>> support
>>>>>>>>>> multiline statement), and another one is whether
>>>>>>> `StatementSet.execute()`
>>>>>>>>>> needs to throw exception.
>>>>>>>>>>
>>>>>>>>>> please refer to the feedback document [2] for the details.
>>>>>>>>>>
>>>>>>>>>> Any suggestions are warmly welcomed!
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>
>> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>>>>>>>>>> [2]
>>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>
>>>
>> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Godfrey
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>>
>>
>

Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

dwysakowicz
Thank you Timo for the great summary! It covers (almost) all the topics.
Even though in the end we are not suggesting much changes to the current
state of FLIP I think it is important to lay out all possible use cases
so that we do not change the execution model every release.

There is one additional thing we discussed. Could we change the result
type of TableResult#collect to Iterator<Row>? Even though those
interfaces do not differ much. I think Iterator better describes that
the results might not be materialized on the client side, but can be
retrieved on a per record basis. The contract of the Iterable#iterator
is that it returns a new iterator each time, which effectively means we
can iterate the results multiple times. Iterating the results is not
possible when we don't retrieve all the results from the cluster at once.

I think we should also use Iterator for
TableEnvironment#executeMultilineSql(String statements):
Iterator<TableResult>.

Best,

Dawid

On 31/03/2020 19:27, Timo Walther wrote:

> Hi Godfrey,
>
> Aljoscha, Dawid, Klou, and I had another discussion around FLIP-84. In
> particular, we discussed how the current status of the FLIP and the
> future requirements around multiline statements, async/sync, collect()
> fit together.
>
> We also updated the FLIP-84 Feedback Summary document [1] with some
> use cases.
>
> We believe that we found a good solution that also fits to what is in
> the current FLIP. So no bigger changes necessary, which is great!
>
> Our findings were:
>
> 1. Async vs sync submission of Flink jobs:
>
> Having a blocking `execute()` in DataStream API was rather a mistake.
> Instead all submissions should be async because this allows supporting
> both modes if necessary. Thus, submitting all queries async sounds
> good to us. If users want to run a job sync, they can use the
> JobClient and wait for completion (or collect() in case of batch jobs).
>
> 2. Multi-statement execution:
>
> For the multi-statement execution, we don't see a contradication with
> the async execution behavior. We imagine a method like:
>
> TableEnvironment#executeMultilineSql(String statements):
> Iterable<TableResult>
>
> Where the `Iterator#next()` method would trigger the next statement
> submission. This allows a caller to decide synchronously when to
> submit statements async to the cluster. Thus, a service such as the
> SQL Client can handle the result of each statement individually and
> process statement by statement sequentially.
>
> 3. The role of TableResult and result retrieval in general
>
> `TableResult` is similar to `JobClient`. Instead of returning a
> `CompletableFuture` of something, it is a concrete util class where
> some methods have the behavior of completable future (e.g. collect(),
> print()) and some are already completed (getTableSchema(),
> getResultKind()).
>
> `StatementSet#execute()` returns a single `TableResult` because the
> order is undefined in a set and all statements have the same schema.
> Its `collect()` will return a row for each executed `INSERT INTO` in
> the order of statement definition.
>
> For simple `SELECT * FROM ...`, the query execution might block until
> `collect()` is called to pull buffered rows from the job (from
> socket/REST API what ever we will use in the future). We can say that
> a statement finished successfully, when the `collect#Iterator#hasNext`
> has returned false.
>
> I hope this summarizes our discussion @Dawid/Aljoscha/Klou?
>
> It would be great if we can add these findings to the FLIP before we
> start voting.
>
> One minor thing: some `execute()` methods still throw a checked
> exception; can we remove that from the FLIP? Also the above mentioned
> `Iterator#next()` would trigger an execution without throwing a
> checked exception.
>
> Thanks,
> Timo
>
> [1]
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit#
>
> On 31.03.20 06:28, godfrey he wrote:
>> Hi, Timo & Jark
>>
>> Thanks for your explanation.
>> Agree with you that async execution should always be async,
>> and sync execution scenario can be covered  by async execution.
>> It helps provide an unified entry point for batch and streaming.
>> I think we can also use sync execution for some testing.
>> So, I agree with you that we provide `executeSql` method and it's async
>> method.
>> If we want sync method in the future, we can add method named
>> `executeSqlSync`.
>>
>> I think we've reached an agreement. I will update the document, and
>> start
>> voting process.
>>
>> Best,
>> Godfrey
>>
>>
>> Jark Wu <[hidden email]> 于2020年3月31日周二 上午12:46写道:
>>
>>> Hi,
>>>
>>> I didn't follow the full discussion.
>>> But I share the same concern with Timo that streaming queries should
>>> always
>>> be async.
>>> Otherwise, I can image it will cause a lot of confusion and problems if
>>> users don't deeply keep the "sync" in mind (e.g. client hangs).
>>> Besides, the streaming mode is still the majority use cases of Flink
>>> and
>>> Flink SQL. We should put the usability at a high priority.
>>>
>>> Best,
>>> Jark
>>>
>>>
>>> On Mon, 30 Mar 2020 at 23:27, Timo Walther <[hidden email]> wrote:
>>>
>>>> Hi Godfrey,
>>>>
>>>> maybe I wasn't expressing my biggest concern enough in my last mail.
>>>> Even in a singleline and sync execution, I think that streaming
>>>> queries
>>>> should not block the execution. Otherwise it is not possible to call
>>>> collect() or print() on them afterwards.
>>>>
>>>> "there are too many things need to discuss for multiline":
>>>>
>>>> True, I don't want to solve all of them right now. But what I know is
>>>> that our newly introduced methods should fit into a multiline
>>>> execution.
>>>> There is no big difference of calling `executeSql(A),
>>>> executeSql(B)` and
>>>> processing a multiline file `A;\nB;`.
>>>>
>>>> I think the example that you mentioned can simply be undefined for
>>>> now.
>>>> Currently, no catalog is modifying data but just metadata. This is a
>>>> separate discussion.
>>>>
>>>> "result of the second statement is indeterministic":
>>>>
>>>> Sure this is indeterministic. But this is the implementers fault
>>>> and we
>>>> cannot forbid such pipelines.
>>>>
>>>> How about we always execute streaming queries async? It would unblock
>>>> executeSql() and multiline statements.
>>>>
>>>> Having a `executeSqlAsync()` is useful for batch. However, I don't
>>>> want
>>>> `sync/async` be the new batch/stream flag. The execution behavior
>>>> should
>>>> come from the query itself.
>>>>
>>>> Regards,
>>>> Timo
>>>>
>>>>
>>>> On 30.03.20 11:12, godfrey he wrote:
>>>>> Hi Timo,
>>>>>
>>>>> Agree with you that streaming queries is our top priority,
>>>>> but I think there are too many things need to discuss for multiline
>>>>> statements:
>>>>> e.g.
>>>>> 1. what's the behaivor of DDL and DML mixing for async execution:
>>>>> create table t1 xxx;
>>>>> create table t2 xxx;
>>>>> insert into t2 select * from t1 where xxx;
>>>>> drop table t1; // t1 may be a MySQL table, the data will also be
>>> deleted.
>>>>>
>>>>> t1 is dropped when "insert" job is running.
>>>>>
>>>>> 2. what's the behaivor of unified scenario for async execution:
>>>>> (as you
>>>>> mentioned)
>>>>> INSERT INTO t1 SELECT * FROM s;
>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
>>>>>
>>>>> The result of the second statement is indeterministic, because the
>>> first
>>>>> statement maybe is running.
>>>>> I think we need to put a lot of effort to define the behavior of
>>>> logically
>>>>> related queries.
>>>>>
>>>>> In this FLIP, I suggest we only handle single statement, and we also
>>>>> introduce an async execute method
>>>>> which is more important and more often used for users.
>>>>>
>>>>> Dor the sync methods (like `TableEnvironment.executeSql` and
>>>>> `StatementSet.execute`),
>>>>> the result will be returned until the job is finished. The following
>>>>> methods will be introduced in this FLIP:
>>>>>
>>>>>    /**
>>>>>     * Asynchronously execute the given single statement
>>>>>     */
>>>>> TableEnvironment.executeSqlAsync(String statement): TableResult
>>>>>
>>>>> /**
>>>>>    * Asynchronously execute the dml statements as a batch
>>>>>    */
>>>>> StatementSet.executeAsync(): TableResult
>>>>>
>>>>> public interface TableResult {
>>>>>      /**
>>>>>       * return JobClient for DQL and DML in async mode, else return
>>>>> Optional.empty
>>>>>       */
>>>>>      Optional<JobClient> getJobClient();
>>>>> }
>>>>>
>>>>> what do you think?
>>>>>
>>>>> Best,
>>>>> Godfrey
>>>>>
>>>>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午9:15写道:
>>>>>
>>>>>> Hi Godfrey,
>>>>>>
>>>>>> executing streaming queries must be our top priority because this is
>>>>>> what distinguishes Flink from competitors. If we change the
>>>>>> execution
>>>>>> behavior, we should think about the other cases as well to not break
>>> the
>>>>>> API a third time.
>>>>>>
>>>>>> I fear that just having an async execute method will not be enough
>>>>>> because users should be able to mix streaming and batch queries in a
>>>>>> unified scenario.
>>>>>>
>>>>>> If I remember it correctly, we had some discussions in the past
>>>>>> about
>>>>>> what decides about the execution mode of a query. Currently, we
>>>>>> would
>>>>>> like to let the query decide, not derive it from the sources.
>>>>>>
>>>>>> So I could image a multiline pipeline as:
>>>>>>
>>>>>> USE CATALOG 'mycat';
>>>>>> INSERT INTO t1 SELECT * FROM s;
>>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
>>>>>>
>>>>>> For executeMultilineSql():
>>>>>>
>>>>>> sync because regular SQL
>>>>>> sync because regular Batch SQL
>>>>>> async because Streaming SQL
>>>>>>
>>>>>> For executeAsyncMultilineSql():
>>>>>>
>>>>>> async because everything should be async
>>>>>> async because everything should be async
>>>>>> async because everything should be async
>>>>>>
>>>>>> What we should not start for executeAsyncMultilineSql():
>>>>>>
>>>>>> sync because DDL
>>>>>> async because everything should be async
>>>>>> async because everything should be async
>>>>>>
>>>>>> What are you thoughts here?
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 26.03.20 12:50, godfrey he wrote:
>>>>>>> Hi Timo,
>>>>>>>
>>>>>>> I agree with you that streaming queries mostly need async
>>>>>>> execution.
>>>>>>> In fact, our original plan is only introducing sync methods in this
>>>> FLIP,
>>>>>>> and async methods (like "executeSqlAsync") will be introduced in
>>>>>>> the
>>>>>> future
>>>>>>> which is mentioned in the appendix.
>>>>>>>
>>>>>>> Maybe the async methods also need to be considered in this FLIP.
>>>>>>>
>>>>>>> I think sync methods is also useful for streaming which can be used
>>> to
>>>>>> run
>>>>>>> bounded source.
>>>>>>> Maybe we should check whether all sources are bounded in sync
>>> execution
>>>>>>> mode.
>>>>>>>
>>>>>>>> Also, if we block for streaming queries, we could never support
>>>>>>>> multiline files. Because the first INSERT INTO would block the
>>> further
>>>>>>>> execution.
>>>>>>> agree with you, we need async method to submit multiline files,
>>>>>>> and files should be limited that the DQL and DML should be
>>>>>>> always in
>>>> the
>>>>>>> end for streaming.
>>>>>>>
>>>>>>> Best,
>>>>>>> Godfrey
>>>>>>>
>>>>>>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午4:29写道:
>>>>>>>
>>>>>>>
>>>>>>>> Hi Godfrey,
>>>>>>>>
>>>>>>>> having control over the job after submission is a requirement that
>>> was
>>>>>>>> requested frequently (some examples are [1], [2]). Users would
>>>>>>>> like
>>> to
>>>>>>>> get insights about the running or completed job. Including the
>>> jobId,
>>>>>>>> jobGraph etc., the JobClient summarizes these properties.
>>>>>>>>
>>>>>>>> It is good to have a discussion about synchronous/asynchronous
>>>>>>>> submission now to have a complete execution picture.
>>>>>>>>
>>>>>>>> I thought we submit streaming queries mostly async and just
>>>>>>>> wait for
>>>> the
>>>>>>>> successful submission. If we block for streaming queries, how
>>>>>>>> can we
>>>>>>>> collect() or print() results?
>>>>>>>>
>>>>>>>> Also, if we block for streaming queries, we could never support
>>>>>>>> multiline files. Because the first INSERT INTO would block the
>>> further
>>>>>>>> execution.
>>>>>>>>
>>>>>>>> If we decide to block entirely on streaming queries, we need the
>>> async
>>>>>>>> execution methods in the design already. However, I would
>>>>>>>> rather go
>>>> for
>>>>>>>> non-blocking streaming queries. Also with the `EMIT STREAM` key
>>>>>>>> word
>>>> in
>>>>>>>> mind that we might add to SQL statements soon.
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-16761
>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-12214
>>>>>>>>
>>>>>>>> On 25.03.20 16:30, godfrey he wrote:
>>>>>>>>> Hi Timo,
>>>>>>>>>
>>>>>>>>> Thanks for the updating.
>>>>>>>>>
>>>>>>>>> Regarding to "multiline statement support", I'm also fine that
>>>>>>>>> `TableEnvironment.executeSql()` only supports single line
>>> statement,
>>>>>> and
>>>>>>>> we
>>>>>>>>> can support multiline statement later (needs more discussion
>>>>>>>>> about
>>>>>> this).
>>>>>>>>>
>>>>>>>>> Regarding to "StatementSet.explian()", I don't have strong
>>>>>>>>> opinions
>>>>>> about
>>>>>>>>> that.
>>>>>>>>>
>>>>>>>>> Regarding to "TableResult.getJobClient()", I think it's
>>> unnecessary.
>>>>>> The
>>>>>>>>> reason is: first, many statements (e.g. DDL, show xx, use xx) 
>>>>>>>>> will
>>>> not
>>>>>>>>> submit a Flink job. second, `TableEnvironment.executeSql()` and
>>>>>>>>> `StatementSet.execute()` are synchronous method, `TableResult`
>>>>>>>>> will
>>>> be
>>>>>>>>> returned only after the job is finished or failed.
>>>>>>>>>
>>>>>>>>> Regarding to "whether StatementSet.execute() needs to throw
>>>>>> exception", I
>>>>>>>>> think we should choose a unified way to tell whether the
>>>>>>>>> execution
>>> is
>>>>>>>>> successful. If `TableResult` contains ERROR kind (non-runtime
>>>>>> exception),
>>>>>>>>> users need to not only check the result but also catch the
>>>>>>>>> runtime
>>>>>>>>> exception in their code. or `StatementSet.execute()` does not
>>>>>>>>> throw
>>>> any
>>>>>>>>> exception (including runtime exception), all exception
>>>>>>>>> messages are
>>>> in
>>>>>>>> the
>>>>>>>>> result.  I prefer "StatementSet.execute() needs to throw
>>> exception".
>>>> cc
>>>>>>>> @Jark
>>>>>>>>> Wu <[hidden email]>
>>>>>>>>>
>>>>>>>>> I will update the agreed parts to the document first.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Godfrey
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Timo Walther <[hidden email]> 于2020年3月25日周三
>>>>>>>>> 下午6:51写道:
>>>>>>>>>
>>>>>>>>>> Hi Godfrey,
>>>>>>>>>>
>>>>>>>>>> thanks for starting the discussion on the mailing list. And
>>>>>>>>>> sorry
>>>>>> again
>>>>>>>>>> for the late reply to FLIP-84. I have updated the Google doc one
>>>> more
>>>>>>>>>> time to incorporate the offline discussions.
>>>>>>>>>>
>>>>>>>>>>      From Dawid's and my view, it is fine to postpone the
>>>>>>>>>> multiline
>>>>>> support
>>>>>>>>>> to a separate method. This can be future work even though we
>>>>>>>>>> will
>>>> need
>>>>>>>>>> it rather soon.
>>>>>>>>>>
>>>>>>>>>> If there are no objections, I suggest to update the FLIP-84
>>>>>>>>>> again
>>>> and
>>>>>>>>>> have another voting process.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Timo
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On 25.03.20 11:17, godfrey he wrote:
>>>>>>>>>>> Hi community,
>>>>>>>>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1].
>>>>>>>>>>> The
>>>>>>>>>> feedbacks
>>>>>>>>>>> are all about new introduced methods. We had a discussion
>>>> yesterday,
>>>>>>>> and
>>>>>>>>>>> most of feedbacks have been agreed upon. Here is the
>>>>>>>>>>> conclusions:
>>>>>>>>>>>
>>>>>>>>>>> *1. about proposed methods in `TableEnvironment`:*
>>>>>>>>>>>
>>>>>>>>>>> the original proposed methods:
>>>>>>>>>>>
>>>>>>>>>>> TableEnvironment.createDmlBatch(): DmlBatch
>>>>>>>>>>> TableEnvironment.executeStatement(String statement):
>>>>>>>>>>> ResultTable
>>>>>>>>>>>
>>>>>>>>>>> the new proposed methods:
>>>>>>>>>>>
>>>>>>>>>>> // we should not use abbreviations in the API, and the term
>>> "Batch"
>>>>>> is
>>>>>>>>>>> easily confused with batch/streaming processing
>>>>>>>>>>> TableEnvironment.createStatementSet(): StatementSet
>>>>>>>>>>>
>>>>>>>>>>> // every method that takes SQL should have `Sql` in its name
>>>>>>>>>>> // supports multiline statement ???
>>>>>>>>>>> TableEnvironment.executeSql(String statement): TableResult
>>>>>>>>>>>
>>>>>>>>>>> // new methods. supports explaining DQL and DML
>>>>>>>>>>> TableEnvironment.explainSql(String statement, ExplainDetail...
>>>>>>>> details):
>>>>>>>>>>> String
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *2. about proposed related classes:*
>>>>>>>>>>>
>>>>>>>>>>> the original proposed classes:
>>>>>>>>>>>
>>>>>>>>>>> interface DmlBatch {
>>>>>>>>>>>          void addInsert(String insert);
>>>>>>>>>>>          void addInsert(String targetPath, Table table);
>>>>>>>>>>>          ResultTable execute() throws Exception ;
>>>>>>>>>>>          String explain(boolean extended);
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> public interface ResultTable {
>>>>>>>>>>>          TableSchema getResultSchema();
>>>>>>>>>>>          Iterable<Row> getResultRows();
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> the new proposed classes:
>>>>>>>>>>>
>>>>>>>>>>> interface StatementSet {
>>>>>>>>>>>          // every method that takes SQL should have `Sql` in
>>>>>>>>>>> its
>>>> name
>>>>>>>>>>>          // return StatementSet instance for fluent programming
>>>>>>>>>>>          addInsertSql(String statement): StatementSet
>>>>>>>>>>>
>>>>>>>>>>>          // return StatementSet instance for fluent programming
>>>>>>>>>>>          addInsert(String tablePath, Table table): StatementSet
>>>>>>>>>>>
>>>>>>>>>>>          // new method. support overwrite mode
>>>>>>>>>>>          addInsert(String tablePath, Table table, boolean
>>>> overwrite):
>>>>>>>>>>> StatementSet
>>>>>>>>>>>
>>>>>>>>>>>          explain(): String
>>>>>>>>>>>
>>>>>>>>>>>          // new method. supports adding more details for the
>>> result
>>>>>>>>>>>          explain(ExplainDetail... extraDetails): String
>>>>>>>>>>>
>>>>>>>>>>>          // throw exception ???
>>>>>>>>>>>          execute(): TableResult
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> interface TableResult {
>>>>>>>>>>>          getTableSchema(): TableSchema
>>>>>>>>>>>
>>>>>>>>>>>          // avoid custom parsing of an "OK" row in programming
>>>>>>>>>>>          getResultKind(): ResultKind
>>>>>>>>>>>
>>>>>>>>>>>          // instead of `get` make it explicit that this is
>>>>>>>>>>> might
>>> be
>>>>>>>>>> triggering
>>>>>>>>>>> an expensive operation
>>>>>>>>>>>          collect(): Iterable<Row>
>>>>>>>>>>>
>>>>>>>>>>>          // for fluent programming
>>>>>>>>>>>          print(): Unit
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> enum ResultKind {
>>>>>>>>>>>          SUCCESS, // for DDL, DCL and statements with a simple
>>> "OK"
>>>>>>>>>>>          SUCCESS_WITH_CONTENT, // rows with important
>>>>>>>>>>> content are
>>>>>>>> available
>>>>>>>>>>> (DML, DQL)
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> *3. new proposed methods in `Table`*
>>>>>>>>>>>
>>>>>>>>>>> `Table.insertInto()` will be deprecated, and the following
>>> methods
>>>>>> are
>>>>>>>>>>> introduced:
>>>>>>>>>>>
>>>>>>>>>>> Table.executeInsert(String tablePath): TableResult
>>>>>>>>>>> Table.executeInsert(String tablePath, boolean overwrite):
>>>> TableResult
>>>>>>>>>>> Table.explain(ExplainDetail... details): String
>>>>>>>>>>> Table.execute(): TableResult
>>>>>>>>>>>
>>>>>>>>>>> There are two issues need further discussion, one is whether
>>>>>>>>>>> `TableEnvironment.executeSql(String statement): TableResult`
>>> needs
>>>> to
>>>>>>>>>>> support multiline statement (or whether `TableEnvironment`
>>>>>>>>>>> needs
>>> to
>>>>>>>>>> support
>>>>>>>>>>> multiline statement), and another one is whether
>>>>>>>> `StatementSet.execute()`
>>>>>>>>>>> needs to throw exception.
>>>>>>>>>>>
>>>>>>>>>>> please refer to the feedback document [2] for the details.
>>>>>>>>>>>
>>>>>>>>>>> Any suggestions are warmly welcomed!
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>>>
>>>>>>>>>>> [2]
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>
>>> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
>>>
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Godfrey
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>
>>
>


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

godfreyhe
Hi, Timo & Dawid,

Thanks so much for the effort of `multiline statements supporting`,
I have a few questions about this method:

1. users can well control the execution logic through the proposed method
 if they know what the statements are (a statement is a DDL, a DML or
others).
but if a statement is from a file, that means users do not know what the
statements are,
the execution behavior is unclear.
As a platform user, I think this method is hard to use, unless the platform
defines
a set of rule about the statements order, such as: no select in the middle,
dml must be at tail of sql file (which may be the most case in product
env).
Otherwise the platform must parse the sql first, then know what the
statements are.
If do like that, the platform can handle all cases through `executeSql` and
`StatementSet`.

2. SQL client can't also use `executeMultilineSql` to supports multiline
statements,
 because there are some special commands introduced in SQL client,
such as `quit`, `source`, `load jar` (not exist now, but maybe we need this
command
 to support dynamic table source and udf).
Does TableEnvironment also supports those commands?

3. btw, we must have this feature in release-1.11? I find there are few
user cases
 in the feedback document which behavior is unclear now.

regarding to "change the return value from `Iterable<Row` to
`Iterator<Row`",
I couldn't agree more with this change. Just as Dawid mentioned
"The contract of the Iterable#iterator is that it returns a new iterator
each time,
 which effectively means we can iterate the results multiple times.",
we does not provide iterate the results multiple times.
If we want do that, the client must buffer all results. but it's impossible
for streaming job.

Best,
Godfrey

Dawid Wysakowicz <[hidden email]> 于2020年4月1日周三 上午3:14写道:

> Thank you Timo for the great summary! It covers (almost) all the topics.
> Even though in the end we are not suggesting much changes to the current
> state of FLIP I think it is important to lay out all possible use cases
> so that we do not change the execution model every release.
>
> There is one additional thing we discussed. Could we change the result
> type of TableResult#collect to Iterator<Row>? Even though those
> interfaces do not differ much. I think Iterator better describes that
> the results might not be materialized on the client side, but can be
> retrieved on a per record basis. The contract of the Iterable#iterator
> is that it returns a new iterator each time, which effectively means we
> can iterate the results multiple times. Iterating the results is not
> possible when we don't retrieve all the results from the cluster at once.
>
> I think we should also use Iterator for
> TableEnvironment#executeMultilineSql(String statements):
> Iterator<TableResult>.
>
> Best,
>
> Dawid
>
> On 31/03/2020 19:27, Timo Walther wrote:
> > Hi Godfrey,
> >
> > Aljoscha, Dawid, Klou, and I had another discussion around FLIP-84. In
> > particular, we discussed how the current status of the FLIP and the
> > future requirements around multiline statements, async/sync, collect()
> > fit together.
> >
> > We also updated the FLIP-84 Feedback Summary document [1] with some
> > use cases.
> >
> > We believe that we found a good solution that also fits to what is in
> > the current FLIP. So no bigger changes necessary, which is great!
> >
> > Our findings were:
> >
> > 1. Async vs sync submission of Flink jobs:
> >
> > Having a blocking `execute()` in DataStream API was rather a mistake.
> > Instead all submissions should be async because this allows supporting
> > both modes if necessary. Thus, submitting all queries async sounds
> > good to us. If users want to run a job sync, they can use the
> > JobClient and wait for completion (or collect() in case of batch jobs).
> >
> > 2. Multi-statement execution:
> >
> > For the multi-statement execution, we don't see a contradication with
> > the async execution behavior. We imagine a method like:
> >
> > TableEnvironment#executeMultilineSql(String statements):
> > Iterable<TableResult>
> >
> > Where the `Iterator#next()` method would trigger the next statement
> > submission. This allows a caller to decide synchronously when to
> > submit statements async to the cluster. Thus, a service such as the
> > SQL Client can handle the result of each statement individually and
> > process statement by statement sequentially.
> >
> > 3. The role of TableResult and result retrieval in general
> >
> > `TableResult` is similar to `JobClient`. Instead of returning a
> > `CompletableFuture` of something, it is a concrete util class where
> > some methods have the behavior of completable future (e.g. collect(),
> > print()) and some are already completed (getTableSchema(),
> > getResultKind()).
> >
> > `StatementSet#execute()` returns a single `TableResult` because the
> > order is undefined in a set and all statements have the same schema.
> > Its `collect()` will return a row for each executed `INSERT INTO` in
> > the order of statement definition.
> >
> > For simple `SELECT * FROM ...`, the query execution might block until
> > `collect()` is called to pull buffered rows from the job (from
> > socket/REST API what ever we will use in the future). We can say that
> > a statement finished successfully, when the `collect#Iterator#hasNext`
> > has returned false.
> >
> > I hope this summarizes our discussion @Dawid/Aljoscha/Klou?
> >
> > It would be great if we can add these findings to the FLIP before we
> > start voting.
> >
> > One minor thing: some `execute()` methods still throw a checked
> > exception; can we remove that from the FLIP? Also the above mentioned
> > `Iterator#next()` would trigger an execution without throwing a
> > checked exception.
> >
> > Thanks,
> > Timo
> >
> > [1]
> >
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit#
> >
> > On 31.03.20 06:28, godfrey he wrote:
> >> Hi, Timo & Jark
> >>
> >> Thanks for your explanation.
> >> Agree with you that async execution should always be async,
> >> and sync execution scenario can be covered  by async execution.
> >> It helps provide an unified entry point for batch and streaming.
> >> I think we can also use sync execution for some testing.
> >> So, I agree with you that we provide `executeSql` method and it's async
> >> method.
> >> If we want sync method in the future, we can add method named
> >> `executeSqlSync`.
> >>
> >> I think we've reached an agreement. I will update the document, and
> >> start
> >> voting process.
> >>
> >> Best,
> >> Godfrey
> >>
> >>
> >> Jark Wu <[hidden email]> 于2020年3月31日周二 上午12:46写道:
> >>
> >>> Hi,
> >>>
> >>> I didn't follow the full discussion.
> >>> But I share the same concern with Timo that streaming queries should
> >>> always
> >>> be async.
> >>> Otherwise, I can image it will cause a lot of confusion and problems if
> >>> users don't deeply keep the "sync" in mind (e.g. client hangs).
> >>> Besides, the streaming mode is still the majority use cases of Flink
> >>> and
> >>> Flink SQL. We should put the usability at a high priority.
> >>>
> >>> Best,
> >>> Jark
> >>>
> >>>
> >>> On Mon, 30 Mar 2020 at 23:27, Timo Walther <[hidden email]> wrote:
> >>>
> >>>> Hi Godfrey,
> >>>>
> >>>> maybe I wasn't expressing my biggest concern enough in my last mail.
> >>>> Even in a singleline and sync execution, I think that streaming
> >>>> queries
> >>>> should not block the execution. Otherwise it is not possible to call
> >>>> collect() or print() on them afterwards.
> >>>>
> >>>> "there are too many things need to discuss for multiline":
> >>>>
> >>>> True, I don't want to solve all of them right now. But what I know is
> >>>> that our newly introduced methods should fit into a multiline
> >>>> execution.
> >>>> There is no big difference of calling `executeSql(A),
> >>>> executeSql(B)` and
> >>>> processing a multiline file `A;\nB;`.
> >>>>
> >>>> I think the example that you mentioned can simply be undefined for
> >>>> now.
> >>>> Currently, no catalog is modifying data but just metadata. This is a
> >>>> separate discussion.
> >>>>
> >>>> "result of the second statement is indeterministic":
> >>>>
> >>>> Sure this is indeterministic. But this is the implementers fault
> >>>> and we
> >>>> cannot forbid such pipelines.
> >>>>
> >>>> How about we always execute streaming queries async? It would unblock
> >>>> executeSql() and multiline statements.
> >>>>
> >>>> Having a `executeSqlAsync()` is useful for batch. However, I don't
> >>>> want
> >>>> `sync/async` be the new batch/stream flag. The execution behavior
> >>>> should
> >>>> come from the query itself.
> >>>>
> >>>> Regards,
> >>>> Timo
> >>>>
> >>>>
> >>>> On 30.03.20 11:12, godfrey he wrote:
> >>>>> Hi Timo,
> >>>>>
> >>>>> Agree with you that streaming queries is our top priority,
> >>>>> but I think there are too many things need to discuss for multiline
> >>>>> statements:
> >>>>> e.g.
> >>>>> 1. what's the behaivor of DDL and DML mixing for async execution:
> >>>>> create table t1 xxx;
> >>>>> create table t2 xxx;
> >>>>> insert into t2 select * from t1 where xxx;
> >>>>> drop table t1; // t1 may be a MySQL table, the data will also be
> >>> deleted.
> >>>>>
> >>>>> t1 is dropped when "insert" job is running.
> >>>>>
> >>>>> 2. what's the behaivor of unified scenario for async execution:
> >>>>> (as you
> >>>>> mentioned)
> >>>>> INSERT INTO t1 SELECT * FROM s;
> >>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> >>>>>
> >>>>> The result of the second statement is indeterministic, because the
> >>> first
> >>>>> statement maybe is running.
> >>>>> I think we need to put a lot of effort to define the behavior of
> >>>> logically
> >>>>> related queries.
> >>>>>
> >>>>> In this FLIP, I suggest we only handle single statement, and we also
> >>>>> introduce an async execute method
> >>>>> which is more important and more often used for users.
> >>>>>
> >>>>> Dor the sync methods (like `TableEnvironment.executeSql` and
> >>>>> `StatementSet.execute`),
> >>>>> the result will be returned until the job is finished. The following
> >>>>> methods will be introduced in this FLIP:
> >>>>>
> >>>>>    /**
> >>>>>     * Asynchronously execute the given single statement
> >>>>>     */
> >>>>> TableEnvironment.executeSqlAsync(String statement): TableResult
> >>>>>
> >>>>> /**
> >>>>>    * Asynchronously execute the dml statements as a batch
> >>>>>    */
> >>>>> StatementSet.executeAsync(): TableResult
> >>>>>
> >>>>> public interface TableResult {
> >>>>>      /**
> >>>>>       * return JobClient for DQL and DML in async mode, else return
> >>>>> Optional.empty
> >>>>>       */
> >>>>>      Optional<JobClient> getJobClient();
> >>>>> }
> >>>>>
> >>>>> what do you think?
> >>>>>
> >>>>> Best,
> >>>>> Godfrey
> >>>>>
> >>>>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午9:15写道:
> >>>>>
> >>>>>> Hi Godfrey,
> >>>>>>
> >>>>>> executing streaming queries must be our top priority because this is
> >>>>>> what distinguishes Flink from competitors. If we change the
> >>>>>> execution
> >>>>>> behavior, we should think about the other cases as well to not break
> >>> the
> >>>>>> API a third time.
> >>>>>>
> >>>>>> I fear that just having an async execute method will not be enough
> >>>>>> because users should be able to mix streaming and batch queries in a
> >>>>>> unified scenario.
> >>>>>>
> >>>>>> If I remember it correctly, we had some discussions in the past
> >>>>>> about
> >>>>>> what decides about the execution mode of a query. Currently, we
> >>>>>> would
> >>>>>> like to let the query decide, not derive it from the sources.
> >>>>>>
> >>>>>> So I could image a multiline pipeline as:
> >>>>>>
> >>>>>> USE CATALOG 'mycat';
> >>>>>> INSERT INTO t1 SELECT * FROM s;
> >>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> >>>>>>
> >>>>>> For executeMultilineSql():
> >>>>>>
> >>>>>> sync because regular SQL
> >>>>>> sync because regular Batch SQL
> >>>>>> async because Streaming SQL
> >>>>>>
> >>>>>> For executeAsyncMultilineSql():
> >>>>>>
> >>>>>> async because everything should be async
> >>>>>> async because everything should be async
> >>>>>> async because everything should be async
> >>>>>>
> >>>>>> What we should not start for executeAsyncMultilineSql():
> >>>>>>
> >>>>>> sync because DDL
> >>>>>> async because everything should be async
> >>>>>> async because everything should be async
> >>>>>>
> >>>>>> What are you thoughts here?
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 26.03.20 12:50, godfrey he wrote:
> >>>>>>> Hi Timo,
> >>>>>>>
> >>>>>>> I agree with you that streaming queries mostly need async
> >>>>>>> execution.
> >>>>>>> In fact, our original plan is only introducing sync methods in this
> >>>> FLIP,
> >>>>>>> and async methods (like "executeSqlAsync") will be introduced in
> >>>>>>> the
> >>>>>> future
> >>>>>>> which is mentioned in the appendix.
> >>>>>>>
> >>>>>>> Maybe the async methods also need to be considered in this FLIP.
> >>>>>>>
> >>>>>>> I think sync methods is also useful for streaming which can be used
> >>> to
> >>>>>> run
> >>>>>>> bounded source.
> >>>>>>> Maybe we should check whether all sources are bounded in sync
> >>> execution
> >>>>>>> mode.
> >>>>>>>
> >>>>>>>> Also, if we block for streaming queries, we could never support
> >>>>>>>> multiline files. Because the first INSERT INTO would block the
> >>> further
> >>>>>>>> execution.
> >>>>>>> agree with you, we need async method to submit multiline files,
> >>>>>>> and files should be limited that the DQL and DML should be
> >>>>>>> always in
> >>>> the
> >>>>>>> end for streaming.
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Godfrey
> >>>>>>>
> >>>>>>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午4:29写道:
> >>>>>>>
> >>>>>>>
> >>>>>>>> Hi Godfrey,
> >>>>>>>>
> >>>>>>>> having control over the job after submission is a requirement that
> >>> was
> >>>>>>>> requested frequently (some examples are [1], [2]). Users would
> >>>>>>>> like
> >>> to
> >>>>>>>> get insights about the running or completed job. Including the
> >>> jobId,
> >>>>>>>> jobGraph etc., the JobClient summarizes these properties.
> >>>>>>>>
> >>>>>>>> It is good to have a discussion about synchronous/asynchronous
> >>>>>>>> submission now to have a complete execution picture.
> >>>>>>>>
> >>>>>>>> I thought we submit streaming queries mostly async and just
> >>>>>>>> wait for
> >>>> the
> >>>>>>>> successful submission. If we block for streaming queries, how
> >>>>>>>> can we
> >>>>>>>> collect() or print() results?
> >>>>>>>>
> >>>>>>>> Also, if we block for streaming queries, we could never support
> >>>>>>>> multiline files. Because the first INSERT INTO would block the
> >>> further
> >>>>>>>> execution.
> >>>>>>>>
> >>>>>>>> If we decide to block entirely on streaming queries, we need the
> >>> async
> >>>>>>>> execution methods in the design already. However, I would
> >>>>>>>> rather go
> >>>> for
> >>>>>>>> non-blocking streaming queries. Also with the `EMIT STREAM` key
> >>>>>>>> word
> >>>> in
> >>>>>>>> mind that we might add to SQL statements soon.
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-16761
> >>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-12214
> >>>>>>>>
> >>>>>>>> On 25.03.20 16:30, godfrey he wrote:
> >>>>>>>>> Hi Timo,
> >>>>>>>>>
> >>>>>>>>> Thanks for the updating.
> >>>>>>>>>
> >>>>>>>>> Regarding to "multiline statement support", I'm also fine that
> >>>>>>>>> `TableEnvironment.executeSql()` only supports single line
> >>> statement,
> >>>>>> and
> >>>>>>>> we
> >>>>>>>>> can support multiline statement later (needs more discussion
> >>>>>>>>> about
> >>>>>> this).
> >>>>>>>>>
> >>>>>>>>> Regarding to "StatementSet.explian()", I don't have strong
> >>>>>>>>> opinions
> >>>>>> about
> >>>>>>>>> that.
> >>>>>>>>>
> >>>>>>>>> Regarding to "TableResult.getJobClient()", I think it's
> >>> unnecessary.
> >>>>>> The
> >>>>>>>>> reason is: first, many statements (e.g. DDL, show xx, use xx)
> >>>>>>>>> will
> >>>> not
> >>>>>>>>> submit a Flink job. second, `TableEnvironment.executeSql()` and
> >>>>>>>>> `StatementSet.execute()` are synchronous method, `TableResult`
> >>>>>>>>> will
> >>>> be
> >>>>>>>>> returned only after the job is finished or failed.
> >>>>>>>>>
> >>>>>>>>> Regarding to "whether StatementSet.execute() needs to throw
> >>>>>> exception", I
> >>>>>>>>> think we should choose a unified way to tell whether the
> >>>>>>>>> execution
> >>> is
> >>>>>>>>> successful. If `TableResult` contains ERROR kind (non-runtime
> >>>>>> exception),
> >>>>>>>>> users need to not only check the result but also catch the
> >>>>>>>>> runtime
> >>>>>>>>> exception in their code. or `StatementSet.execute()` does not
> >>>>>>>>> throw
> >>>> any
> >>>>>>>>> exception (including runtime exception), all exception
> >>>>>>>>> messages are
> >>>> in
> >>>>>>>> the
> >>>>>>>>> result.  I prefer "StatementSet.execute() needs to throw
> >>> exception".
> >>>> cc
> >>>>>>>> @Jark
> >>>>>>>>> Wu <[hidden email]>
> >>>>>>>>>
> >>>>>>>>> I will update the agreed parts to the document first.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Godfrey
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Timo Walther <[hidden email]> 于2020年3月25日周三
> >>>>>>>>> 下午6:51写道:
> >>>>>>>>>
> >>>>>>>>>> Hi Godfrey,
> >>>>>>>>>>
> >>>>>>>>>> thanks for starting the discussion on the mailing list. And
> >>>>>>>>>> sorry
> >>>>>> again
> >>>>>>>>>> for the late reply to FLIP-84. I have updated the Google doc one
> >>>> more
> >>>>>>>>>> time to incorporate the offline discussions.
> >>>>>>>>>>
> >>>>>>>>>>      From Dawid's and my view, it is fine to postpone the
> >>>>>>>>>> multiline
> >>>>>> support
> >>>>>>>>>> to a separate method. This can be future work even though we
> >>>>>>>>>> will
> >>>> need
> >>>>>>>>>> it rather soon.
> >>>>>>>>>>
> >>>>>>>>>> If there are no objections, I suggest to update the FLIP-84
> >>>>>>>>>> again
> >>>> and
> >>>>>>>>>> have another voting process.
> >>>>>>>>>>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On 25.03.20 11:17, godfrey he wrote:
> >>>>>>>>>>> Hi community,
> >>>>>>>>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1].
> >>>>>>>>>>> The
> >>>>>>>>>> feedbacks
> >>>>>>>>>>> are all about new introduced methods. We had a discussion
> >>>> yesterday,
> >>>>>>>> and
> >>>>>>>>>>> most of feedbacks have been agreed upon. Here is the
> >>>>>>>>>>> conclusions:
> >>>>>>>>>>>
> >>>>>>>>>>> *1. about proposed methods in `TableEnvironment`:*
> >>>>>>>>>>>
> >>>>>>>>>>> the original proposed methods:
> >>>>>>>>>>>
> >>>>>>>>>>> TableEnvironment.createDmlBatch(): DmlBatch
> >>>>>>>>>>> TableEnvironment.executeStatement(String statement):
> >>>>>>>>>>> ResultTable
> >>>>>>>>>>>
> >>>>>>>>>>> the new proposed methods:
> >>>>>>>>>>>
> >>>>>>>>>>> // we should not use abbreviations in the API, and the term
> >>> "Batch"
> >>>>>> is
> >>>>>>>>>>> easily confused with batch/streaming processing
> >>>>>>>>>>> TableEnvironment.createStatementSet(): StatementSet
> >>>>>>>>>>>
> >>>>>>>>>>> // every method that takes SQL should have `Sql` in its name
> >>>>>>>>>>> // supports multiline statement ???
> >>>>>>>>>>> TableEnvironment.executeSql(String statement): TableResult
> >>>>>>>>>>>
> >>>>>>>>>>> // new methods. supports explaining DQL and DML
> >>>>>>>>>>> TableEnvironment.explainSql(String statement, ExplainDetail...
> >>>>>>>> details):
> >>>>>>>>>>> String
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> *2. about proposed related classes:*
> >>>>>>>>>>>
> >>>>>>>>>>> the original proposed classes:
> >>>>>>>>>>>
> >>>>>>>>>>> interface DmlBatch {
> >>>>>>>>>>>          void addInsert(String insert);
> >>>>>>>>>>>          void addInsert(String targetPath, Table table);
> >>>>>>>>>>>          ResultTable execute() throws Exception ;
> >>>>>>>>>>>          String explain(boolean extended);
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>> public interface ResultTable {
> >>>>>>>>>>>          TableSchema getResultSchema();
> >>>>>>>>>>>          Iterable<Row> getResultRows();
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>> the new proposed classes:
> >>>>>>>>>>>
> >>>>>>>>>>> interface StatementSet {
> >>>>>>>>>>>          // every method that takes SQL should have `Sql` in
> >>>>>>>>>>> its
> >>>> name
> >>>>>>>>>>>          // return StatementSet instance for fluent programming
> >>>>>>>>>>>          addInsertSql(String statement): StatementSet
> >>>>>>>>>>>
> >>>>>>>>>>>          // return StatementSet instance for fluent programming
> >>>>>>>>>>>          addInsert(String tablePath, Table table): StatementSet
> >>>>>>>>>>>
> >>>>>>>>>>>          // new method. support overwrite mode
> >>>>>>>>>>>          addInsert(String tablePath, Table table, boolean
> >>>> overwrite):
> >>>>>>>>>>> StatementSet
> >>>>>>>>>>>
> >>>>>>>>>>>          explain(): String
> >>>>>>>>>>>
> >>>>>>>>>>>          // new method. supports adding more details for the
> >>> result
> >>>>>>>>>>>          explain(ExplainDetail... extraDetails): String
> >>>>>>>>>>>
> >>>>>>>>>>>          // throw exception ???
> >>>>>>>>>>>          execute(): TableResult
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>> interface TableResult {
> >>>>>>>>>>>          getTableSchema(): TableSchema
> >>>>>>>>>>>
> >>>>>>>>>>>          // avoid custom parsing of an "OK" row in programming
> >>>>>>>>>>>          getResultKind(): ResultKind
> >>>>>>>>>>>
> >>>>>>>>>>>          // instead of `get` make it explicit that this is
> >>>>>>>>>>> might
> >>> be
> >>>>>>>>>> triggering
> >>>>>>>>>>> an expensive operation
> >>>>>>>>>>>          collect(): Iterable<Row>
> >>>>>>>>>>>
> >>>>>>>>>>>          // for fluent programming
> >>>>>>>>>>>          print(): Unit
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>> enum ResultKind {
> >>>>>>>>>>>          SUCCESS, // for DDL, DCL and statements with a simple
> >>> "OK"
> >>>>>>>>>>>          SUCCESS_WITH_CONTENT, // rows with important
> >>>>>>>>>>> content are
> >>>>>>>> available
> >>>>>>>>>>> (DML, DQL)
> >>>>>>>>>>> }
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> *3. new proposed methods in `Table`*
> >>>>>>>>>>>
> >>>>>>>>>>> `Table.insertInto()` will be deprecated, and the following
> >>> methods
> >>>>>> are
> >>>>>>>>>>> introduced:
> >>>>>>>>>>>
> >>>>>>>>>>> Table.executeInsert(String tablePath): TableResult
> >>>>>>>>>>> Table.executeInsert(String tablePath, boolean overwrite):
> >>>> TableResult
> >>>>>>>>>>> Table.explain(ExplainDetail... details): String
> >>>>>>>>>>> Table.execute(): TableResult
> >>>>>>>>>>>
> >>>>>>>>>>> There are two issues need further discussion, one is whether
> >>>>>>>>>>> `TableEnvironment.executeSql(String statement): TableResult`
> >>> needs
> >>>> to
> >>>>>>>>>>> support multiline statement (or whether `TableEnvironment`
> >>>>>>>>>>> needs
> >>> to
> >>>>>>>>>> support
> >>>>>>>>>>> multiline statement), and another one is whether
> >>>>>>>> `StatementSet.execute()`
> >>>>>>>>>>> needs to throw exception.
> >>>>>>>>>>>
> >>>>>>>>>>> please refer to the feedback document [2] for the details.
> >>>>>>>>>>>
> >>>>>>>>>>> Any suggestions are warmly welcomed!
> >>>>>>>>>>>
> >>>>>>>>>>> [1]
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >>>
> >>>>>>>>>>> [2]
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>
> >>>
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
> >>>
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Godfrey
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>>
> >>>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

godfreyhe
In reply to this post by Timo Walther-2
Hi Timo,

Regarding to "`execute` method throws checked exception",
 is that mean we should convert the checked exception to unchecked
exception
or we need add ERROR type in ResultKind.

for the second approach, I still think it's not convenient
for the user to check exception when calling `collect` method and `print`
method.
the code looks like:

// add `getError()` method in TableResult and store the exception
in TableResult independent
TableResult result = tEnv.executeSql("select xxx");
if (result.getResultKind() == ResultKind.ERROR) {
  print result.getError();
} else {
  Iterator<Row> it =  result.collect();
  it...
}

 // treat the exception as a kind of result, and get exception through
`collect` method
TableResult result = tEnv.executeSql("select xxx");
if (result.getResultKind() == ResultKind.ERROR) {
   Iterator<Row> it =  result.collect();
   Row row = it.next();
   print row.getField(0);
} else {
  Iterator<Row> it =  result.collect();
  it...
}

// for fluent programming
Iterator<Row> it = tEnv.executeSql("select xxx").collect();
it...

Best,
Godfrey

Timo Walther <[hidden email]> 于2020年4月1日周三 上午1:27写道:

> Hi Godfrey,
>
> Aljoscha, Dawid, Klou, and I had another discussion around FLIP-84. In
> particular, we discussed how the current status of the FLIP and the
> future requirements around multiline statements, async/sync, collect()
> fit together.
>
> We also updated the FLIP-84 Feedback Summary document [1] with some use
> cases.
>
> We believe that we found a good solution that also fits to what is in
> the current FLIP. So no bigger changes necessary, which is great!
>
> Our findings were:
>
> 1. Async vs sync submission of Flink jobs:
>
> Having a blocking `execute()` in DataStream API was rather a mistake.
> Instead all submissions should be async because this allows supporting
> both modes if necessary. Thus, submitting all queries async sounds good
> to us. If users want to run a job sync, they can use the JobClient and
> wait for completion (or collect() in case of batch jobs).
>
> 2. Multi-statement execution:
>
> For the multi-statement execution, we don't see a contradication with
> the async execution behavior. We imagine a method like:
>
> TableEnvironment#executeMultilineSql(String statements):
> Iterable<TableResult>
>
> Where the `Iterator#next()` method would trigger the next statement
> submission. This allows a caller to decide synchronously when to submit
> statements async to the cluster. Thus, a service such as the SQL Client
> can handle the result of each statement individually and process
> statement by statement sequentially.
>
> 3. The role of TableResult and result retrieval in general
>
> `TableResult` is similar to `JobClient`. Instead of returning a
> `CompletableFuture` of something, it is a concrete util class where some
> methods have the behavior of completable future (e.g. collect(),
> print()) and some are already completed (getTableSchema(),
> getResultKind()).
>
> `StatementSet#execute()` returns a single `TableResult` because the
> order is undefined in a set and all statements have the same schema. Its
> `collect()` will return a row for each executed `INSERT INTO` in the
> order of statement definition.
>
> For simple `SELECT * FROM ...`, the query execution might block until
> `collect()` is called to pull buffered rows from the job (from
> socket/REST API what ever we will use in the future). We can say that a
> statement finished successfully, when the `collect#Iterator#hasNext` has
> returned false.
>
> I hope this summarizes our discussion @Dawid/Aljoscha/Klou?
>
> It would be great if we can add these findings to the FLIP before we
> start voting.
>
> One minor thing: some `execute()` methods still throw a checked
> exception; can we remove that from the FLIP? Also the above mentioned
> `Iterator#next()` would trigger an execution without throwing a checked
> exception.
>
> Thanks,
> Timo
>
> [1]
>
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit#
>
> On 31.03.20 06:28, godfrey he wrote:
> > Hi, Timo & Jark
> >
> > Thanks for your explanation.
> > Agree with you that async execution should always be async,
> > and sync execution scenario can be covered  by async execution.
> > It helps provide an unified entry point for batch and streaming.
> > I think we can also use sync execution for some testing.
> > So, I agree with you that we provide `executeSql` method and it's async
> > method.
> > If we want sync method in the future, we can add method named
> > `executeSqlSync`.
> >
> > I think we've reached an agreement. I will update the document, and start
> > voting process.
> >
> > Best,
> > Godfrey
> >
> >
> > Jark Wu <[hidden email]> 于2020年3月31日周二 上午12:46写道:
> >
> >> Hi,
> >>
> >> I didn't follow the full discussion.
> >> But I share the same concern with Timo that streaming queries should
> always
> >> be async.
> >> Otherwise, I can image it will cause a lot of confusion and problems if
> >> users don't deeply keep the "sync" in mind (e.g. client hangs).
> >> Besides, the streaming mode is still the majority use cases of Flink and
> >> Flink SQL. We should put the usability at a high priority.
> >>
> >> Best,
> >> Jark
> >>
> >>
> >> On Mon, 30 Mar 2020 at 23:27, Timo Walther <[hidden email]> wrote:
> >>
> >>> Hi Godfrey,
> >>>
> >>> maybe I wasn't expressing my biggest concern enough in my last mail.
> >>> Even in a singleline and sync execution, I think that streaming queries
> >>> should not block the execution. Otherwise it is not possible to call
> >>> collect() or print() on them afterwards.
> >>>
> >>> "there are too many things need to discuss for multiline":
> >>>
> >>> True, I don't want to solve all of them right now. But what I know is
> >>> that our newly introduced methods should fit into a multiline
> execution.
> >>> There is no big difference of calling `executeSql(A), executeSql(B)`
> and
> >>> processing a multiline file `A;\nB;`.
> >>>
> >>> I think the example that you mentioned can simply be undefined for now.
> >>> Currently, no catalog is modifying data but just metadata. This is a
> >>> separate discussion.
> >>>
> >>> "result of the second statement is indeterministic":
> >>>
> >>> Sure this is indeterministic. But this is the implementers fault and we
> >>> cannot forbid such pipelines.
> >>>
> >>> How about we always execute streaming queries async? It would unblock
> >>> executeSql() and multiline statements.
> >>>
> >>> Having a `executeSqlAsync()` is useful for batch. However, I don't want
> >>> `sync/async` be the new batch/stream flag. The execution behavior
> should
> >>> come from the query itself.
> >>>
> >>> Regards,
> >>> Timo
> >>>
> >>>
> >>> On 30.03.20 11:12, godfrey he wrote:
> >>>> Hi Timo,
> >>>>
> >>>> Agree with you that streaming queries is our top priority,
> >>>> but I think there are too many things need to discuss for multiline
> >>>> statements:
> >>>> e.g.
> >>>> 1. what's the behaivor of DDL and DML mixing for async execution:
> >>>> create table t1 xxx;
> >>>> create table t2 xxx;
> >>>> insert into t2 select * from t1 where xxx;
> >>>> drop table t1; // t1 may be a MySQL table, the data will also be
> >> deleted.
> >>>>
> >>>> t1 is dropped when "insert" job is running.
> >>>>
> >>>> 2. what's the behaivor of unified scenario for async execution: (as
> you
> >>>> mentioned)
> >>>> INSERT INTO t1 SELECT * FROM s;
> >>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> >>>>
> >>>> The result of the second statement is indeterministic, because the
> >> first
> >>>> statement maybe is running.
> >>>> I think we need to put a lot of effort to define the behavior of
> >>> logically
> >>>> related queries.
> >>>>
> >>>> In this FLIP, I suggest we only handle single statement, and we also
> >>>> introduce an async execute method
> >>>> which is more important and more often used for users.
> >>>>
> >>>> Dor the sync methods (like `TableEnvironment.executeSql` and
> >>>> `StatementSet.execute`),
> >>>> the result will be returned until the job is finished. The following
> >>>> methods will be introduced in this FLIP:
> >>>>
> >>>>    /**
> >>>>     * Asynchronously execute the given single statement
> >>>>     */
> >>>> TableEnvironment.executeSqlAsync(String statement): TableResult
> >>>>
> >>>> /**
> >>>>    * Asynchronously execute the dml statements as a batch
> >>>>    */
> >>>> StatementSet.executeAsync(): TableResult
> >>>>
> >>>> public interface TableResult {
> >>>>      /**
> >>>>       * return JobClient for DQL and DML in async mode, else return
> >>>> Optional.empty
> >>>>       */
> >>>>      Optional<JobClient> getJobClient();
> >>>> }
> >>>>
> >>>> what do you think?
> >>>>
> >>>> Best,
> >>>> Godfrey
> >>>>
> >>>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午9:15写道:
> >>>>
> >>>>> Hi Godfrey,
> >>>>>
> >>>>> executing streaming queries must be our top priority because this is
> >>>>> what distinguishes Flink from competitors. If we change the execution
> >>>>> behavior, we should think about the other cases as well to not break
> >> the
> >>>>> API a third time.
> >>>>>
> >>>>> I fear that just having an async execute method will not be enough
> >>>>> because users should be able to mix streaming and batch queries in a
> >>>>> unified scenario.
> >>>>>
> >>>>> If I remember it correctly, we had some discussions in the past about
> >>>>> what decides about the execution mode of a query. Currently, we would
> >>>>> like to let the query decide, not derive it from the sources.
> >>>>>
> >>>>> So I could image a multiline pipeline as:
> >>>>>
> >>>>> USE CATALOG 'mycat';
> >>>>> INSERT INTO t1 SELECT * FROM s;
> >>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> >>>>>
> >>>>> For executeMultilineSql():
> >>>>>
> >>>>> sync because regular SQL
> >>>>> sync because regular Batch SQL
> >>>>> async because Streaming SQL
> >>>>>
> >>>>> For executeAsyncMultilineSql():
> >>>>>
> >>>>> async because everything should be async
> >>>>> async because everything should be async
> >>>>> async because everything should be async
> >>>>>
> >>>>> What we should not start for executeAsyncMultilineSql():
> >>>>>
> >>>>> sync because DDL
> >>>>> async because everything should be async
> >>>>> async because everything should be async
> >>>>>
> >>>>> What are you thoughts here?
> >>>>>
> >>>>> Regards,
> >>>>> Timo
> >>>>>
> >>>>>
> >>>>> On 26.03.20 12:50, godfrey he wrote:
> >>>>>> Hi Timo,
> >>>>>>
> >>>>>> I agree with you that streaming queries mostly need async execution.
> >>>>>> In fact, our original plan is only introducing sync methods in this
> >>> FLIP,
> >>>>>> and async methods (like "executeSqlAsync") will be introduced in the
> >>>>> future
> >>>>>> which is mentioned in the appendix.
> >>>>>>
> >>>>>> Maybe the async methods also need to be considered in this FLIP.
> >>>>>>
> >>>>>> I think sync methods is also useful for streaming which can be used
> >> to
> >>>>> run
> >>>>>> bounded source.
> >>>>>> Maybe we should check whether all sources are bounded in sync
> >> execution
> >>>>>> mode.
> >>>>>>
> >>>>>>> Also, if we block for streaming queries, we could never support
> >>>>>>> multiline files. Because the first INSERT INTO would block the
> >> further
> >>>>>>> execution.
> >>>>>> agree with you, we need async method to submit multiline files,
> >>>>>> and files should be limited that the DQL and DML should be always in
> >>> the
> >>>>>> end for streaming.
> >>>>>>
> >>>>>> Best,
> >>>>>> Godfrey
> >>>>>>
> >>>>>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午4:29写道:
> >>>>>>
> >>>>>>> Hi Godfrey,
> >>>>>>>
> >>>>>>> having control over the job after submission is a requirement that
> >> was
> >>>>>>> requested frequently (some examples are [1], [2]). Users would like
> >> to
> >>>>>>> get insights about the running or completed job. Including the
> >> jobId,
> >>>>>>> jobGraph etc., the JobClient summarizes these properties.
> >>>>>>>
> >>>>>>> It is good to have a discussion about synchronous/asynchronous
> >>>>>>> submission now to have a complete execution picture.
> >>>>>>>
> >>>>>>> I thought we submit streaming queries mostly async and just wait
> for
> >>> the
> >>>>>>> successful submission. If we block for streaming queries, how can
> we
> >>>>>>> collect() or print() results?
> >>>>>>>
> >>>>>>> Also, if we block for streaming queries, we could never support
> >>>>>>> multiline files. Because the first INSERT INTO would block the
> >> further
> >>>>>>> execution.
> >>>>>>>
> >>>>>>> If we decide to block entirely on streaming queries, we need the
> >> async
> >>>>>>> execution methods in the design already. However, I would rather go
> >>> for
> >>>>>>> non-blocking streaming queries. Also with the `EMIT STREAM` key
> word
> >>> in
> >>>>>>> mind that we might add to SQL statements soon.
> >>>>>>>
> >>>>>>> Regards,
> >>>>>>> Timo
> >>>>>>>
> >>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-16761
> >>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-12214
> >>>>>>>
> >>>>>>> On 25.03.20 16:30, godfrey he wrote:
> >>>>>>>> Hi Timo,
> >>>>>>>>
> >>>>>>>> Thanks for the updating.
> >>>>>>>>
> >>>>>>>> Regarding to "multiline statement support", I'm also fine that
> >>>>>>>> `TableEnvironment.executeSql()` only supports single line
> >> statement,
> >>>>> and
> >>>>>>> we
> >>>>>>>> can support multiline statement later (needs more discussion about
> >>>>> this).
> >>>>>>>>
> >>>>>>>> Regarding to "StatementSet.explian()", I don't have strong
> opinions
> >>>>> about
> >>>>>>>> that.
> >>>>>>>>
> >>>>>>>> Regarding to "TableResult.getJobClient()", I think it's
> >> unnecessary.
> >>>>> The
> >>>>>>>> reason is: first, many statements (e.g. DDL, show xx, use xx)
> will
> >>> not
> >>>>>>>> submit a Flink job. second, `TableEnvironment.executeSql()` and
> >>>>>>>> `StatementSet.execute()` are synchronous method, `TableResult`
> will
> >>> be
> >>>>>>>> returned only after the job is finished or failed.
> >>>>>>>>
> >>>>>>>> Regarding to "whether StatementSet.execute() needs to throw
> >>>>> exception", I
> >>>>>>>> think we should choose a unified way to tell whether the execution
> >> is
> >>>>>>>> successful. If `TableResult` contains ERROR kind (non-runtime
> >>>>> exception),
> >>>>>>>> users need to not only check the result but also catch the runtime
> >>>>>>>> exception in their code. or `StatementSet.execute()` does not
> throw
> >>> any
> >>>>>>>> exception (including runtime exception), all exception messages
> are
> >>> in
> >>>>>>> the
> >>>>>>>> result.  I prefer "StatementSet.execute() needs to throw
> >> exception".
> >>> cc
> >>>>>>> @Jark
> >>>>>>>> Wu <[hidden email]>
> >>>>>>>>
> >>>>>>>> I will update the agreed parts to the document first.
> >>>>>>>>
> >>>>>>>> Best,
> >>>>>>>> Godfrey
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Timo Walther <[hidden email]> 于2020年3月25日周三 下午6:51写道:
> >>>>>>>>
> >>>>>>>>> Hi Godfrey,
> >>>>>>>>>
> >>>>>>>>> thanks for starting the discussion on the mailing list. And sorry
> >>>>> again
> >>>>>>>>> for the late reply to FLIP-84. I have updated the Google doc one
> >>> more
> >>>>>>>>> time to incorporate the offline discussions.
> >>>>>>>>>
> >>>>>>>>>      From Dawid's and my view, it is fine to postpone the
> multiline
> >>>>> support
> >>>>>>>>> to a separate method. This can be future work even though we will
> >>> need
> >>>>>>>>> it rather soon.
> >>>>>>>>>
> >>>>>>>>> If there are no objections, I suggest to update the FLIP-84 again
> >>> and
> >>>>>>>>> have another voting process.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Timo
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 25.03.20 11:17, godfrey he wrote:
> >>>>>>>>>> Hi community,
> >>>>>>>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1]. The
> >>>>>>>>> feedbacks
> >>>>>>>>>> are all about new introduced methods. We had a discussion
> >>> yesterday,
> >>>>>>> and
> >>>>>>>>>> most of feedbacks have been agreed upon. Here is the
> conclusions:
> >>>>>>>>>>
> >>>>>>>>>> *1. about proposed methods in `TableEnvironment`:*
> >>>>>>>>>>
> >>>>>>>>>> the original proposed methods:
> >>>>>>>>>>
> >>>>>>>>>> TableEnvironment.createDmlBatch(): DmlBatch
> >>>>>>>>>> TableEnvironment.executeStatement(String statement): ResultTable
> >>>>>>>>>>
> >>>>>>>>>> the new proposed methods:
> >>>>>>>>>>
> >>>>>>>>>> // we should not use abbreviations in the API, and the term
> >> "Batch"
> >>>>> is
> >>>>>>>>>> easily confused with batch/streaming processing
> >>>>>>>>>> TableEnvironment.createStatementSet(): StatementSet
> >>>>>>>>>>
> >>>>>>>>>> // every method that takes SQL should have `Sql` in its name
> >>>>>>>>>> // supports multiline statement ???
> >>>>>>>>>> TableEnvironment.executeSql(String statement): TableResult
> >>>>>>>>>>
> >>>>>>>>>> // new methods. supports explaining DQL and DML
> >>>>>>>>>> TableEnvironment.explainSql(String statement, ExplainDetail...
> >>>>>>> details):
> >>>>>>>>>> String
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> *2. about proposed related classes:*
> >>>>>>>>>>
> >>>>>>>>>> the original proposed classes:
> >>>>>>>>>>
> >>>>>>>>>> interface DmlBatch {
> >>>>>>>>>>          void addInsert(String insert);
> >>>>>>>>>>          void addInsert(String targetPath, Table table);
> >>>>>>>>>>          ResultTable execute() throws Exception ;
> >>>>>>>>>>          String explain(boolean extended);
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> public interface ResultTable {
> >>>>>>>>>>          TableSchema getResultSchema();
> >>>>>>>>>>          Iterable<Row> getResultRows();
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> the new proposed classes:
> >>>>>>>>>>
> >>>>>>>>>> interface StatementSet {
> >>>>>>>>>>          // every method that takes SQL should have `Sql` in its
> >>> name
> >>>>>>>>>>          // return StatementSet instance for fluent programming
> >>>>>>>>>>          addInsertSql(String statement): StatementSet
> >>>>>>>>>>
> >>>>>>>>>>          // return StatementSet instance for fluent programming
> >>>>>>>>>>          addInsert(String tablePath, Table table): StatementSet
> >>>>>>>>>>
> >>>>>>>>>>          // new method. support overwrite mode
> >>>>>>>>>>          addInsert(String tablePath, Table table, boolean
> >>> overwrite):
> >>>>>>>>>> StatementSet
> >>>>>>>>>>
> >>>>>>>>>>          explain(): String
> >>>>>>>>>>
> >>>>>>>>>>          // new method. supports adding more details for the
> >> result
> >>>>>>>>>>          explain(ExplainDetail... extraDetails): String
> >>>>>>>>>>
> >>>>>>>>>>          // throw exception ???
> >>>>>>>>>>          execute(): TableResult
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> interface TableResult {
> >>>>>>>>>>          getTableSchema(): TableSchema
> >>>>>>>>>>
> >>>>>>>>>>          // avoid custom parsing of an "OK" row in programming
> >>>>>>>>>>          getResultKind(): ResultKind
> >>>>>>>>>>
> >>>>>>>>>>          // instead of `get` make it explicit that this is might
> >> be
> >>>>>>>>> triggering
> >>>>>>>>>> an expensive operation
> >>>>>>>>>>          collect(): Iterable<Row>
> >>>>>>>>>>
> >>>>>>>>>>          // for fluent programming
> >>>>>>>>>>          print(): Unit
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>> enum ResultKind {
> >>>>>>>>>>          SUCCESS, // for DDL, DCL and statements with a simple
> >> "OK"
> >>>>>>>>>>          SUCCESS_WITH_CONTENT, // rows with important content
> are
> >>>>>>> available
> >>>>>>>>>> (DML, DQL)
> >>>>>>>>>> }
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> *3. new proposed methods in `Table`*
> >>>>>>>>>>
> >>>>>>>>>> `Table.insertInto()` will be deprecated, and the following
> >> methods
> >>>>> are
> >>>>>>>>>> introduced:
> >>>>>>>>>>
> >>>>>>>>>> Table.executeInsert(String tablePath): TableResult
> >>>>>>>>>> Table.executeInsert(String tablePath, boolean overwrite):
> >>> TableResult
> >>>>>>>>>> Table.explain(ExplainDetail... details): String
> >>>>>>>>>> Table.execute(): TableResult
> >>>>>>>>>>
> >>>>>>>>>> There are two issues need further discussion, one is whether
> >>>>>>>>>> `TableEnvironment.executeSql(String statement): TableResult`
> >> needs
> >>> to
> >>>>>>>>>> support multiline statement (or whether `TableEnvironment` needs
> >> to
> >>>>>>>>> support
> >>>>>>>>>> multiline statement), and another one is whether
> >>>>>>> `StatementSet.execute()`
> >>>>>>>>>> needs to throw exception.
> >>>>>>>>>>
> >>>>>>>>>> please refer to the feedback document [2] for the details.
> >>>>>>>>>>
> >>>>>>>>>> Any suggestions are warmly welcomed!
> >>>>>>>>>>
> >>>>>>>>>> [1]
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> >>
> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >>>>>>>>>> [2]
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>
> >>>
> >>
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
> >>>>>>>>>>
> >>>>>>>>>> Best,
> >>>>>>>>>> Godfrey
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>
> >
>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

Aljoscha Krettek-2
In reply to this post by godfreyhe
Agreed to what Dawid and Timo said.

To answer your question about multi line SQL: no, we don't think we need
this in Flink 1.11, we only wanted to make sure that the interfaces that
we now put in place will potentially allow this in the future.

Best,
Aljoscha

On 01.04.20 09:31, godfrey he wrote:

> Hi, Timo & Dawid,
>
> Thanks so much for the effort of `multiline statements supporting`,
> I have a few questions about this method:
>
> 1. users can well control the execution logic through the proposed method
>   if they know what the statements are (a statement is a DDL, a DML or
> others).
> but if a statement is from a file, that means users do not know what the
> statements are,
> the execution behavior is unclear.
> As a platform user, I think this method is hard to use, unless the platform
> defines
> a set of rule about the statements order, such as: no select in the middle,
> dml must be at tail of sql file (which may be the most case in product
> env).
> Otherwise the platform must parse the sql first, then know what the
> statements are.
> If do like that, the platform can handle all cases through `executeSql` and
> `StatementSet`.
>
> 2. SQL client can't also use `executeMultilineSql` to supports multiline
> statements,
>   because there are some special commands introduced in SQL client,
> such as `quit`, `source`, `load jar` (not exist now, but maybe we need this
> command
>   to support dynamic table source and udf).
> Does TableEnvironment also supports those commands?
>
> 3. btw, we must have this feature in release-1.11? I find there are few
> user cases
>   in the feedback document which behavior is unclear now.
>
> regarding to "change the return value from `Iterable<Row` to
> `Iterator<Row`",
> I couldn't agree more with this change. Just as Dawid mentioned
> "The contract of the Iterable#iterator is that it returns a new iterator
> each time,
>   which effectively means we can iterate the results multiple times.",
> we does not provide iterate the results multiple times.
> If we want do that, the client must buffer all results. but it's impossible
> for streaming job.
>
> Best,
> Godfrey
>
> Dawid Wysakowicz <[hidden email]> 于2020年4月1日周三 上午3:14写道:
>
>> Thank you Timo for the great summary! It covers (almost) all the topics.
>> Even though in the end we are not suggesting much changes to the current
>> state of FLIP I think it is important to lay out all possible use cases
>> so that we do not change the execution model every release.
>>
>> There is one additional thing we discussed. Could we change the result
>> type of TableResult#collect to Iterator<Row>? Even though those
>> interfaces do not differ much. I think Iterator better describes that
>> the results might not be materialized on the client side, but can be
>> retrieved on a per record basis. The contract of the Iterable#iterator
>> is that it returns a new iterator each time, which effectively means we
>> can iterate the results multiple times. Iterating the results is not
>> possible when we don't retrieve all the results from the cluster at once.
>>
>> I think we should also use Iterator for
>> TableEnvironment#executeMultilineSql(String statements):
>> Iterator<TableResult>.
>>
>> Best,
>>
>> Dawid
>>
>> On 31/03/2020 19:27, Timo Walther wrote:
>>> Hi Godfrey,
>>>
>>> Aljoscha, Dawid, Klou, and I had another discussion around FLIP-84. In
>>> particular, we discussed how the current status of the FLIP and the
>>> future requirements around multiline statements, async/sync, collect()
>>> fit together.
>>>
>>> We also updated the FLIP-84 Feedback Summary document [1] with some
>>> use cases.
>>>
>>> We believe that we found a good solution that also fits to what is in
>>> the current FLIP. So no bigger changes necessary, which is great!
>>>
>>> Our findings were:
>>>
>>> 1. Async vs sync submission of Flink jobs:
>>>
>>> Having a blocking `execute()` in DataStream API was rather a mistake.
>>> Instead all submissions should be async because this allows supporting
>>> both modes if necessary. Thus, submitting all queries async sounds
>>> good to us. If users want to run a job sync, they can use the
>>> JobClient and wait for completion (or collect() in case of batch jobs).
>>>
>>> 2. Multi-statement execution:
>>>
>>> For the multi-statement execution, we don't see a contradication with
>>> the async execution behavior. We imagine a method like:
>>>
>>> TableEnvironment#executeMultilineSql(String statements):
>>> Iterable<TableResult>
>>>
>>> Where the `Iterator#next()` method would trigger the next statement
>>> submission. This allows a caller to decide synchronously when to
>>> submit statements async to the cluster. Thus, a service such as the
>>> SQL Client can handle the result of each statement individually and
>>> process statement by statement sequentially.
>>>
>>> 3. The role of TableResult and result retrieval in general
>>>
>>> `TableResult` is similar to `JobClient`. Instead of returning a
>>> `CompletableFuture` of something, it is a concrete util class where
>>> some methods have the behavior of completable future (e.g. collect(),
>>> print()) and some are already completed (getTableSchema(),
>>> getResultKind()).
>>>
>>> `StatementSet#execute()` returns a single `TableResult` because the
>>> order is undefined in a set and all statements have the same schema.
>>> Its `collect()` will return a row for each executed `INSERT INTO` in
>>> the order of statement definition.
>>>
>>> For simple `SELECT * FROM ...`, the query execution might block until
>>> `collect()` is called to pull buffered rows from the job (from
>>> socket/REST API what ever we will use in the future). We can say that
>>> a statement finished successfully, when the `collect#Iterator#hasNext`
>>> has returned false.
>>>
>>> I hope this summarizes our discussion @Dawid/Aljoscha/Klou?
>>>
>>> It would be great if we can add these findings to the FLIP before we
>>> start voting.
>>>
>>> One minor thing: some `execute()` methods still throw a checked
>>> exception; can we remove that from the FLIP? Also the above mentioned
>>> `Iterator#next()` would trigger an execution without throwing a
>>> checked exception.
>>>
>>> Thanks,
>>> Timo
>>>
>>> [1]
>>>
>> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit#
>>>
>>> On 31.03.20 06:28, godfrey he wrote:
>>>> Hi, Timo & Jark
>>>>
>>>> Thanks for your explanation.
>>>> Agree with you that async execution should always be async,
>>>> and sync execution scenario can be covered  by async execution.
>>>> It helps provide an unified entry point for batch and streaming.
>>>> I think we can also use sync execution for some testing.
>>>> So, I agree with you that we provide `executeSql` method and it's async
>>>> method.
>>>> If we want sync method in the future, we can add method named
>>>> `executeSqlSync`.
>>>>
>>>> I think we've reached an agreement. I will update the document, and
>>>> start
>>>> voting process.
>>>>
>>>> Best,
>>>> Godfrey
>>>>
>>>>
>>>> Jark Wu <[hidden email]> 于2020年3月31日周二 上午12:46写道:
>>>>
>>>>> Hi,
>>>>>
>>>>> I didn't follow the full discussion.
>>>>> But I share the same concern with Timo that streaming queries should
>>>>> always
>>>>> be async.
>>>>> Otherwise, I can image it will cause a lot of confusion and problems if
>>>>> users don't deeply keep the "sync" in mind (e.g. client hangs).
>>>>> Besides, the streaming mode is still the majority use cases of Flink
>>>>> and
>>>>> Flink SQL. We should put the usability at a high priority.
>>>>>
>>>>> Best,
>>>>> Jark
>>>>>
>>>>>
>>>>> On Mon, 30 Mar 2020 at 23:27, Timo Walther <[hidden email]> wrote:
>>>>>
>>>>>> Hi Godfrey,
>>>>>>
>>>>>> maybe I wasn't expressing my biggest concern enough in my last mail.
>>>>>> Even in a singleline and sync execution, I think that streaming
>>>>>> queries
>>>>>> should not block the execution. Otherwise it is not possible to call
>>>>>> collect() or print() on them afterwards.
>>>>>>
>>>>>> "there are too many things need to discuss for multiline":
>>>>>>
>>>>>> True, I don't want to solve all of them right now. But what I know is
>>>>>> that our newly introduced methods should fit into a multiline
>>>>>> execution.
>>>>>> There is no big difference of calling `executeSql(A),
>>>>>> executeSql(B)` and
>>>>>> processing a multiline file `A;\nB;`.
>>>>>>
>>>>>> I think the example that you mentioned can simply be undefined for
>>>>>> now.
>>>>>> Currently, no catalog is modifying data but just metadata. This is a
>>>>>> separate discussion.
>>>>>>
>>>>>> "result of the second statement is indeterministic":
>>>>>>
>>>>>> Sure this is indeterministic. But this is the implementers fault
>>>>>> and we
>>>>>> cannot forbid such pipelines.
>>>>>>
>>>>>> How about we always execute streaming queries async? It would unblock
>>>>>> executeSql() and multiline statements.
>>>>>>
>>>>>> Having a `executeSqlAsync()` is useful for batch. However, I don't
>>>>>> want
>>>>>> `sync/async` be the new batch/stream flag. The execution behavior
>>>>>> should
>>>>>> come from the query itself.
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>>> On 30.03.20 11:12, godfrey he wrote:
>>>>>>> Hi Timo,
>>>>>>>
>>>>>>> Agree with you that streaming queries is our top priority,
>>>>>>> but I think there are too many things need to discuss for multiline
>>>>>>> statements:
>>>>>>> e.g.
>>>>>>> 1. what's the behaivor of DDL and DML mixing for async execution:
>>>>>>> create table t1 xxx;
>>>>>>> create table t2 xxx;
>>>>>>> insert into t2 select * from t1 where xxx;
>>>>>>> drop table t1; // t1 may be a MySQL table, the data will also be
>>>>> deleted.
>>>>>>>
>>>>>>> t1 is dropped when "insert" job is running.
>>>>>>>
>>>>>>> 2. what's the behaivor of unified scenario for async execution:
>>>>>>> (as you
>>>>>>> mentioned)
>>>>>>> INSERT INTO t1 SELECT * FROM s;
>>>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
>>>>>>>
>>>>>>> The result of the second statement is indeterministic, because the
>>>>> first
>>>>>>> statement maybe is running.
>>>>>>> I think we need to put a lot of effort to define the behavior of
>>>>>> logically
>>>>>>> related queries.
>>>>>>>
>>>>>>> In this FLIP, I suggest we only handle single statement, and we also
>>>>>>> introduce an async execute method
>>>>>>> which is more important and more often used for users.
>>>>>>>
>>>>>>> Dor the sync methods (like `TableEnvironment.executeSql` and
>>>>>>> `StatementSet.execute`),
>>>>>>> the result will be returned until the job is finished. The following
>>>>>>> methods will be introduced in this FLIP:
>>>>>>>
>>>>>>>     /**
>>>>>>>      * Asynchronously execute the given single statement
>>>>>>>      */
>>>>>>> TableEnvironment.executeSqlAsync(String statement): TableResult
>>>>>>>
>>>>>>> /**
>>>>>>>     * Asynchronously execute the dml statements as a batch
>>>>>>>     */
>>>>>>> StatementSet.executeAsync(): TableResult
>>>>>>>
>>>>>>> public interface TableResult {
>>>>>>>       /**
>>>>>>>        * return JobClient for DQL and DML in async mode, else return
>>>>>>> Optional.empty
>>>>>>>        */
>>>>>>>       Optional<JobClient> getJobClient();
>>>>>>> }
>>>>>>>
>>>>>>> what do you think?
>>>>>>>
>>>>>>> Best,
>>>>>>> Godfrey
>>>>>>>
>>>>>>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午9:15写道:
>>>>>>>
>>>>>>>> Hi Godfrey,
>>>>>>>>
>>>>>>>> executing streaming queries must be our top priority because this is
>>>>>>>> what distinguishes Flink from competitors. If we change the
>>>>>>>> execution
>>>>>>>> behavior, we should think about the other cases as well to not break
>>>>> the
>>>>>>>> API a third time.
>>>>>>>>
>>>>>>>> I fear that just having an async execute method will not be enough
>>>>>>>> because users should be able to mix streaming and batch queries in a
>>>>>>>> unified scenario.
>>>>>>>>
>>>>>>>> If I remember it correctly, we had some discussions in the past
>>>>>>>> about
>>>>>>>> what decides about the execution mode of a query. Currently, we
>>>>>>>> would
>>>>>>>> like to let the query decide, not derive it from the sources.
>>>>>>>>
>>>>>>>> So I could image a multiline pipeline as:
>>>>>>>>
>>>>>>>> USE CATALOG 'mycat';
>>>>>>>> INSERT INTO t1 SELECT * FROM s;
>>>>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
>>>>>>>>
>>>>>>>> For executeMultilineSql():
>>>>>>>>
>>>>>>>> sync because regular SQL
>>>>>>>> sync because regular Batch SQL
>>>>>>>> async because Streaming SQL
>>>>>>>>
>>>>>>>> For executeAsyncMultilineSql():
>>>>>>>>
>>>>>>>> async because everything should be async
>>>>>>>> async because everything should be async
>>>>>>>> async because everything should be async
>>>>>>>>
>>>>>>>> What we should not start for executeAsyncMultilineSql():
>>>>>>>>
>>>>>>>> sync because DDL
>>>>>>>> async because everything should be async
>>>>>>>> async because everything should be async
>>>>>>>>
>>>>>>>> What are you thoughts here?
>>>>>>>>
>>>>>>>> Regards,
>>>>>>>> Timo
>>>>>>>>
>>>>>>>>
>>>>>>>> On 26.03.20 12:50, godfrey he wrote:
>>>>>>>>> Hi Timo,
>>>>>>>>>
>>>>>>>>> I agree with you that streaming queries mostly need async
>>>>>>>>> execution.
>>>>>>>>> In fact, our original plan is only introducing sync methods in this
>>>>>> FLIP,
>>>>>>>>> and async methods (like "executeSqlAsync") will be introduced in
>>>>>>>>> the
>>>>>>>> future
>>>>>>>>> which is mentioned in the appendix.
>>>>>>>>>
>>>>>>>>> Maybe the async methods also need to be considered in this FLIP.
>>>>>>>>>
>>>>>>>>> I think sync methods is also useful for streaming which can be used
>>>>> to
>>>>>>>> run
>>>>>>>>> bounded source.
>>>>>>>>> Maybe we should check whether all sources are bounded in sync
>>>>> execution
>>>>>>>>> mode.
>>>>>>>>>
>>>>>>>>>> Also, if we block for streaming queries, we could never support
>>>>>>>>>> multiline files. Because the first INSERT INTO would block the
>>>>> further
>>>>>>>>>> execution.
>>>>>>>>> agree with you, we need async method to submit multiline files,
>>>>>>>>> and files should be limited that the DQL and DML should be
>>>>>>>>> always in
>>>>>> the
>>>>>>>>> end for streaming.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Godfrey
>>>>>>>>>
>>>>>>>>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午4:29写道:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> Hi Godfrey,
>>>>>>>>>>
>>>>>>>>>> having control over the job after submission is a requirement that
>>>>> was
>>>>>>>>>> requested frequently (some examples are [1], [2]). Users would
>>>>>>>>>> like
>>>>> to
>>>>>>>>>> get insights about the running or completed job. Including the
>>>>> jobId,
>>>>>>>>>> jobGraph etc., the JobClient summarizes these properties.
>>>>>>>>>>
>>>>>>>>>> It is good to have a discussion about synchronous/asynchronous
>>>>>>>>>> submission now to have a complete execution picture.
>>>>>>>>>>
>>>>>>>>>> I thought we submit streaming queries mostly async and just
>>>>>>>>>> wait for
>>>>>> the
>>>>>>>>>> successful submission. If we block for streaming queries, how
>>>>>>>>>> can we
>>>>>>>>>> collect() or print() results?
>>>>>>>>>>
>>>>>>>>>> Also, if we block for streaming queries, we could never support
>>>>>>>>>> multiline files. Because the first INSERT INTO would block the
>>>>> further
>>>>>>>>>> execution.
>>>>>>>>>>
>>>>>>>>>> If we decide to block entirely on streaming queries, we need the
>>>>> async
>>>>>>>>>> execution methods in the design already. However, I would
>>>>>>>>>> rather go
>>>>>> for
>>>>>>>>>> non-blocking streaming queries. Also with the `EMIT STREAM` key
>>>>>>>>>> word
>>>>>> in
>>>>>>>>>> mind that we might add to SQL statements soon.
>>>>>>>>>>
>>>>>>>>>> Regards,
>>>>>>>>>> Timo
>>>>>>>>>>
>>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-16761
>>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-12214
>>>>>>>>>>
>>>>>>>>>> On 25.03.20 16:30, godfrey he wrote:
>>>>>>>>>>> Hi Timo,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for the updating.
>>>>>>>>>>>
>>>>>>>>>>> Regarding to "multiline statement support", I'm also fine that
>>>>>>>>>>> `TableEnvironment.executeSql()` only supports single line
>>>>> statement,
>>>>>>>> and
>>>>>>>>>> we
>>>>>>>>>>> can support multiline statement later (needs more discussion
>>>>>>>>>>> about
>>>>>>>> this).
>>>>>>>>>>>
>>>>>>>>>>> Regarding to "StatementSet.explian()", I don't have strong
>>>>>>>>>>> opinions
>>>>>>>> about
>>>>>>>>>>> that.
>>>>>>>>>>>
>>>>>>>>>>> Regarding to "TableResult.getJobClient()", I think it's
>>>>> unnecessary.
>>>>>>>> The
>>>>>>>>>>> reason is: first, many statements (e.g. DDL, show xx, use xx)
>>>>>>>>>>> will
>>>>>> not
>>>>>>>>>>> submit a Flink job. second, `TableEnvironment.executeSql()` and
>>>>>>>>>>> `StatementSet.execute()` are synchronous method, `TableResult`
>>>>>>>>>>> will
>>>>>> be
>>>>>>>>>>> returned only after the job is finished or failed.
>>>>>>>>>>>
>>>>>>>>>>> Regarding to "whether StatementSet.execute() needs to throw
>>>>>>>> exception", I
>>>>>>>>>>> think we should choose a unified way to tell whether the
>>>>>>>>>>> execution
>>>>> is
>>>>>>>>>>> successful. If `TableResult` contains ERROR kind (non-runtime
>>>>>>>> exception),
>>>>>>>>>>> users need to not only check the result but also catch the
>>>>>>>>>>> runtime
>>>>>>>>>>> exception in their code. or `StatementSet.execute()` does not
>>>>>>>>>>> throw
>>>>>> any
>>>>>>>>>>> exception (including runtime exception), all exception
>>>>>>>>>>> messages are
>>>>>> in
>>>>>>>>>> the
>>>>>>>>>>> result.  I prefer "StatementSet.execute() needs to throw
>>>>> exception".
>>>>>> cc
>>>>>>>>>> @Jark
>>>>>>>>>>> Wu <[hidden email]>
>>>>>>>>>>>
>>>>>>>>>>> I will update the agreed parts to the document first.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Godfrey
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Timo Walther <[hidden email]> 于2020年3月25日周三
>>>>>>>>>>> 下午6:51写道:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Godfrey,
>>>>>>>>>>>>
>>>>>>>>>>>> thanks for starting the discussion on the mailing list. And
>>>>>>>>>>>> sorry
>>>>>>>> again
>>>>>>>>>>>> for the late reply to FLIP-84. I have updated the Google doc one
>>>>>> more
>>>>>>>>>>>> time to incorporate the offline discussions.
>>>>>>>>>>>>
>>>>>>>>>>>>       From Dawid's and my view, it is fine to postpone the
>>>>>>>>>>>> multiline
>>>>>>>> support
>>>>>>>>>>>> to a separate method. This can be future work even though we
>>>>>>>>>>>> will
>>>>>> need
>>>>>>>>>>>> it rather soon.
>>>>>>>>>>>>
>>>>>>>>>>>> If there are no objections, I suggest to update the FLIP-84
>>>>>>>>>>>> again
>>>>>> and
>>>>>>>>>>>> have another voting process.
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks,
>>>>>>>>>>>> Timo
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On 25.03.20 11:17, godfrey he wrote:
>>>>>>>>>>>>> Hi community,
>>>>>>>>>>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1].
>>>>>>>>>>>>> The
>>>>>>>>>>>> feedbacks
>>>>>>>>>>>>> are all about new introduced methods. We had a discussion
>>>>>> yesterday,
>>>>>>>>>> and
>>>>>>>>>>>>> most of feedbacks have been agreed upon. Here is the
>>>>>>>>>>>>> conclusions:
>>>>>>>>>>>>>
>>>>>>>>>>>>> *1. about proposed methods in `TableEnvironment`:*
>>>>>>>>>>>>>
>>>>>>>>>>>>> the original proposed methods:
>>>>>>>>>>>>>
>>>>>>>>>>>>> TableEnvironment.createDmlBatch(): DmlBatch
>>>>>>>>>>>>> TableEnvironment.executeStatement(String statement):
>>>>>>>>>>>>> ResultTable
>>>>>>>>>>>>>
>>>>>>>>>>>>> the new proposed methods:
>>>>>>>>>>>>>
>>>>>>>>>>>>> // we should not use abbreviations in the API, and the term
>>>>> "Batch"
>>>>>>>> is
>>>>>>>>>>>>> easily confused with batch/streaming processing
>>>>>>>>>>>>> TableEnvironment.createStatementSet(): StatementSet
>>>>>>>>>>>>>
>>>>>>>>>>>>> // every method that takes SQL should have `Sql` in its name
>>>>>>>>>>>>> // supports multiline statement ???
>>>>>>>>>>>>> TableEnvironment.executeSql(String statement): TableResult
>>>>>>>>>>>>>
>>>>>>>>>>>>> // new methods. supports explaining DQL and DML
>>>>>>>>>>>>> TableEnvironment.explainSql(String statement, ExplainDetail...
>>>>>>>>>> details):
>>>>>>>>>>>>> String
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> *2. about proposed related classes:*
>>>>>>>>>>>>>
>>>>>>>>>>>>> the original proposed classes:
>>>>>>>>>>>>>
>>>>>>>>>>>>> interface DmlBatch {
>>>>>>>>>>>>>           void addInsert(String insert);
>>>>>>>>>>>>>           void addInsert(String targetPath, Table table);
>>>>>>>>>>>>>           ResultTable execute() throws Exception ;
>>>>>>>>>>>>>           String explain(boolean extended);
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> public interface ResultTable {
>>>>>>>>>>>>>           TableSchema getResultSchema();
>>>>>>>>>>>>>           Iterable<Row> getResultRows();
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> the new proposed classes:
>>>>>>>>>>>>>
>>>>>>>>>>>>> interface StatementSet {
>>>>>>>>>>>>>           // every method that takes SQL should have `Sql` in
>>>>>>>>>>>>> its
>>>>>> name
>>>>>>>>>>>>>           // return StatementSet instance for fluent programming
>>>>>>>>>>>>>           addInsertSql(String statement): StatementSet
>>>>>>>>>>>>>
>>>>>>>>>>>>>           // return StatementSet instance for fluent programming
>>>>>>>>>>>>>           addInsert(String tablePath, Table table): StatementSet
>>>>>>>>>>>>>
>>>>>>>>>>>>>           // new method. support overwrite mode
>>>>>>>>>>>>>           addInsert(String tablePath, Table table, boolean
>>>>>> overwrite):
>>>>>>>>>>>>> StatementSet
>>>>>>>>>>>>>
>>>>>>>>>>>>>           explain(): String
>>>>>>>>>>>>>
>>>>>>>>>>>>>           // new method. supports adding more details for the
>>>>> result
>>>>>>>>>>>>>           explain(ExplainDetail... extraDetails): String
>>>>>>>>>>>>>
>>>>>>>>>>>>>           // throw exception ???
>>>>>>>>>>>>>           execute(): TableResult
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> interface TableResult {
>>>>>>>>>>>>>           getTableSchema(): TableSchema
>>>>>>>>>>>>>
>>>>>>>>>>>>>           // avoid custom parsing of an "OK" row in programming
>>>>>>>>>>>>>           getResultKind(): ResultKind
>>>>>>>>>>>>>
>>>>>>>>>>>>>           // instead of `get` make it explicit that this is
>>>>>>>>>>>>> might
>>>>> be
>>>>>>>>>>>> triggering
>>>>>>>>>>>>> an expensive operation
>>>>>>>>>>>>>           collect(): Iterable<Row>
>>>>>>>>>>>>>
>>>>>>>>>>>>>           // for fluent programming
>>>>>>>>>>>>>           print(): Unit
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> enum ResultKind {
>>>>>>>>>>>>>           SUCCESS, // for DDL, DCL and statements with a simple
>>>>> "OK"
>>>>>>>>>>>>>           SUCCESS_WITH_CONTENT, // rows with important
>>>>>>>>>>>>> content are
>>>>>>>>>> available
>>>>>>>>>>>>> (DML, DQL)
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> *3. new proposed methods in `Table`*
>>>>>>>>>>>>>
>>>>>>>>>>>>> `Table.insertInto()` will be deprecated, and the following
>>>>> methods
>>>>>>>> are
>>>>>>>>>>>>> introduced:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Table.executeInsert(String tablePath): TableResult
>>>>>>>>>>>>> Table.executeInsert(String tablePath, boolean overwrite):
>>>>>> TableResult
>>>>>>>>>>>>> Table.explain(ExplainDetail... details): String
>>>>>>>>>>>>> Table.execute(): TableResult
>>>>>>>>>>>>>
>>>>>>>>>>>>> There are two issues need further discussion, one is whether
>>>>>>>>>>>>> `TableEnvironment.executeSql(String statement): TableResult`
>>>>> needs
>>>>>> to
>>>>>>>>>>>>> support multiline statement (or whether `TableEnvironment`
>>>>>>>>>>>>> needs
>>>>> to
>>>>>>>>>>>> support
>>>>>>>>>>>>> multiline statement), and another one is whether
>>>>>>>>>> `StatementSet.execute()`
>>>>>>>>>>>>> needs to throw exception.
>>>>>>>>>>>>>
>>>>>>>>>>>>> please refer to the feedback document [2] for the details.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Any suggestions are warmly welcomed!
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>>>>>
>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>
>> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Best,
>>>>>>>>>>>>> Godfrey
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

Kurt Young
One comment to `executeMultilineSql`, I'm afraid sometimes user might
forget to
iterate the returned iterators, e.g. user submits a bunch of DDLs and
expect the
framework will execute them one by one. But it didn't.

Best,
Kurt


On Wed, Apr 1, 2020 at 5:10 PM Aljoscha Krettek <[hidden email]> wrote:

> Agreed to what Dawid and Timo said.
>
> To answer your question about multi line SQL: no, we don't think we need
> this in Flink 1.11, we only wanted to make sure that the interfaces that
> we now put in place will potentially allow this in the future.
>
> Best,
> Aljoscha
>
> On 01.04.20 09:31, godfrey he wrote:
> > Hi, Timo & Dawid,
> >
> > Thanks so much for the effort of `multiline statements supporting`,
> > I have a few questions about this method:
> >
> > 1. users can well control the execution logic through the proposed method
> >   if they know what the statements are (a statement is a DDL, a DML or
> > others).
> > but if a statement is from a file, that means users do not know what the
> > statements are,
> > the execution behavior is unclear.
> > As a platform user, I think this method is hard to use, unless the
> platform
> > defines
> > a set of rule about the statements order, such as: no select in the
> middle,
> > dml must be at tail of sql file (which may be the most case in product
> > env).
> > Otherwise the platform must parse the sql first, then know what the
> > statements are.
> > If do like that, the platform can handle all cases through `executeSql`
> and
> > `StatementSet`.
> >
> > 2. SQL client can't also use `executeMultilineSql` to supports multiline
> > statements,
> >   because there are some special commands introduced in SQL client,
> > such as `quit`, `source`, `load jar` (not exist now, but maybe we need
> this
> > command
> >   to support dynamic table source and udf).
> > Does TableEnvironment also supports those commands?
> >
> > 3. btw, we must have this feature in release-1.11? I find there are few
> > user cases
> >   in the feedback document which behavior is unclear now.
> >
> > regarding to "change the return value from `Iterable<Row` to
> > `Iterator<Row`",
> > I couldn't agree more with this change. Just as Dawid mentioned
> > "The contract of the Iterable#iterator is that it returns a new iterator
> > each time,
> >   which effectively means we can iterate the results multiple times.",
> > we does not provide iterate the results multiple times.
> > If we want do that, the client must buffer all results. but it's
> impossible
> > for streaming job.
> >
> > Best,
> > Godfrey
> >
> > Dawid Wysakowicz <[hidden email]> 于2020年4月1日周三 上午3:14写道:
> >
> >> Thank you Timo for the great summary! It covers (almost) all the topics.
> >> Even though in the end we are not suggesting much changes to the current
> >> state of FLIP I think it is important to lay out all possible use cases
> >> so that we do not change the execution model every release.
> >>
> >> There is one additional thing we discussed. Could we change the result
> >> type of TableResult#collect to Iterator<Row>? Even though those
> >> interfaces do not differ much. I think Iterator better describes that
> >> the results might not be materialized on the client side, but can be
> >> retrieved on a per record basis. The contract of the Iterable#iterator
> >> is that it returns a new iterator each time, which effectively means we
> >> can iterate the results multiple times. Iterating the results is not
> >> possible when we don't retrieve all the results from the cluster at
> once.
> >>
> >> I think we should also use Iterator for
> >> TableEnvironment#executeMultilineSql(String statements):
> >> Iterator<TableResult>.
> >>
> >> Best,
> >>
> >> Dawid
> >>
> >> On 31/03/2020 19:27, Timo Walther wrote:
> >>> Hi Godfrey,
> >>>
> >>> Aljoscha, Dawid, Klou, and I had another discussion around FLIP-84. In
> >>> particular, we discussed how the current status of the FLIP and the
> >>> future requirements around multiline statements, async/sync, collect()
> >>> fit together.
> >>>
> >>> We also updated the FLIP-84 Feedback Summary document [1] with some
> >>> use cases.
> >>>
> >>> We believe that we found a good solution that also fits to what is in
> >>> the current FLIP. So no bigger changes necessary, which is great!
> >>>
> >>> Our findings were:
> >>>
> >>> 1. Async vs sync submission of Flink jobs:
> >>>
> >>> Having a blocking `execute()` in DataStream API was rather a mistake.
> >>> Instead all submissions should be async because this allows supporting
> >>> both modes if necessary. Thus, submitting all queries async sounds
> >>> good to us. If users want to run a job sync, they can use the
> >>> JobClient and wait for completion (or collect() in case of batch jobs).
> >>>
> >>> 2. Multi-statement execution:
> >>>
> >>> For the multi-statement execution, we don't see a contradication with
> >>> the async execution behavior. We imagine a method like:
> >>>
> >>> TableEnvironment#executeMultilineSql(String statements):
> >>> Iterable<TableResult>
> >>>
> >>> Where the `Iterator#next()` method would trigger the next statement
> >>> submission. This allows a caller to decide synchronously when to
> >>> submit statements async to the cluster. Thus, a service such as the
> >>> SQL Client can handle the result of each statement individually and
> >>> process statement by statement sequentially.
> >>>
> >>> 3. The role of TableResult and result retrieval in general
> >>>
> >>> `TableResult` is similar to `JobClient`. Instead of returning a
> >>> `CompletableFuture` of something, it is a concrete util class where
> >>> some methods have the behavior of completable future (e.g. collect(),
> >>> print()) and some are already completed (getTableSchema(),
> >>> getResultKind()).
> >>>
> >>> `StatementSet#execute()` returns a single `TableResult` because the
> >>> order is undefined in a set and all statements have the same schema.
> >>> Its `collect()` will return a row for each executed `INSERT INTO` in
> >>> the order of statement definition.
> >>>
> >>> For simple `SELECT * FROM ...`, the query execution might block until
> >>> `collect()` is called to pull buffered rows from the job (from
> >>> socket/REST API what ever we will use in the future). We can say that
> >>> a statement finished successfully, when the `collect#Iterator#hasNext`
> >>> has returned false.
> >>>
> >>> I hope this summarizes our discussion @Dawid/Aljoscha/Klou?
> >>>
> >>> It would be great if we can add these findings to the FLIP before we
> >>> start voting.
> >>>
> >>> One minor thing: some `execute()` methods still throw a checked
> >>> exception; can we remove that from the FLIP? Also the above mentioned
> >>> `Iterator#next()` would trigger an execution without throwing a
> >>> checked exception.
> >>>
> >>> Thanks,
> >>> Timo
> >>>
> >>> [1]
> >>>
> >>
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit#
> >>>
> >>> On 31.03.20 06:28, godfrey he wrote:
> >>>> Hi, Timo & Jark
> >>>>
> >>>> Thanks for your explanation.
> >>>> Agree with you that async execution should always be async,
> >>>> and sync execution scenario can be covered  by async execution.
> >>>> It helps provide an unified entry point for batch and streaming.
> >>>> I think we can also use sync execution for some testing.
> >>>> So, I agree with you that we provide `executeSql` method and it's
> async
> >>>> method.
> >>>> If we want sync method in the future, we can add method named
> >>>> `executeSqlSync`.
> >>>>
> >>>> I think we've reached an agreement. I will update the document, and
> >>>> start
> >>>> voting process.
> >>>>
> >>>> Best,
> >>>> Godfrey
> >>>>
> >>>>
> >>>> Jark Wu <[hidden email]> 于2020年3月31日周二 上午12:46写道:
> >>>>
> >>>>> Hi,
> >>>>>
> >>>>> I didn't follow the full discussion.
> >>>>> But I share the same concern with Timo that streaming queries should
> >>>>> always
> >>>>> be async.
> >>>>> Otherwise, I can image it will cause a lot of confusion and problems
> if
> >>>>> users don't deeply keep the "sync" in mind (e.g. client hangs).
> >>>>> Besides, the streaming mode is still the majority use cases of Flink
> >>>>> and
> >>>>> Flink SQL. We should put the usability at a high priority.
> >>>>>
> >>>>> Best,
> >>>>> Jark
> >>>>>
> >>>>>
> >>>>> On Mon, 30 Mar 2020 at 23:27, Timo Walther <[hidden email]>
> wrote:
> >>>>>
> >>>>>> Hi Godfrey,
> >>>>>>
> >>>>>> maybe I wasn't expressing my biggest concern enough in my last mail.
> >>>>>> Even in a singleline and sync execution, I think that streaming
> >>>>>> queries
> >>>>>> should not block the execution. Otherwise it is not possible to call
> >>>>>> collect() or print() on them afterwards.
> >>>>>>
> >>>>>> "there are too many things need to discuss for multiline":
> >>>>>>
> >>>>>> True, I don't want to solve all of them right now. But what I know
> is
> >>>>>> that our newly introduced methods should fit into a multiline
> >>>>>> execution.
> >>>>>> There is no big difference of calling `executeSql(A),
> >>>>>> executeSql(B)` and
> >>>>>> processing a multiline file `A;\nB;`.
> >>>>>>
> >>>>>> I think the example that you mentioned can simply be undefined for
> >>>>>> now.
> >>>>>> Currently, no catalog is modifying data but just metadata. This is a
> >>>>>> separate discussion.
> >>>>>>
> >>>>>> "result of the second statement is indeterministic":
> >>>>>>
> >>>>>> Sure this is indeterministic. But this is the implementers fault
> >>>>>> and we
> >>>>>> cannot forbid such pipelines.
> >>>>>>
> >>>>>> How about we always execute streaming queries async? It would
> unblock
> >>>>>> executeSql() and multiline statements.
> >>>>>>
> >>>>>> Having a `executeSqlAsync()` is useful for batch. However, I don't
> >>>>>> want
> >>>>>> `sync/async` be the new batch/stream flag. The execution behavior
> >>>>>> should
> >>>>>> come from the query itself.
> >>>>>>
> >>>>>> Regards,
> >>>>>> Timo
> >>>>>>
> >>>>>>
> >>>>>> On 30.03.20 11:12, godfrey he wrote:
> >>>>>>> Hi Timo,
> >>>>>>>
> >>>>>>> Agree with you that streaming queries is our top priority,
> >>>>>>> but I think there are too many things need to discuss for multiline
> >>>>>>> statements:
> >>>>>>> e.g.
> >>>>>>> 1. what's the behaivor of DDL and DML mixing for async execution:
> >>>>>>> create table t1 xxx;
> >>>>>>> create table t2 xxx;
> >>>>>>> insert into t2 select * from t1 where xxx;
> >>>>>>> drop table t1; // t1 may be a MySQL table, the data will also be
> >>>>> deleted.
> >>>>>>>
> >>>>>>> t1 is dropped when "insert" job is running.
> >>>>>>>
> >>>>>>> 2. what's the behaivor of unified scenario for async execution:
> >>>>>>> (as you
> >>>>>>> mentioned)
> >>>>>>> INSERT INTO t1 SELECT * FROM s;
> >>>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> >>>>>>>
> >>>>>>> The result of the second statement is indeterministic, because the
> >>>>> first
> >>>>>>> statement maybe is running.
> >>>>>>> I think we need to put a lot of effort to define the behavior of
> >>>>>> logically
> >>>>>>> related queries.
> >>>>>>>
> >>>>>>> In this FLIP, I suggest we only handle single statement, and we
> also
> >>>>>>> introduce an async execute method
> >>>>>>> which is more important and more often used for users.
> >>>>>>>
> >>>>>>> Dor the sync methods (like `TableEnvironment.executeSql` and
> >>>>>>> `StatementSet.execute`),
> >>>>>>> the result will be returned until the job is finished. The
> following
> >>>>>>> methods will be introduced in this FLIP:
> >>>>>>>
> >>>>>>>     /**
> >>>>>>>      * Asynchronously execute the given single statement
> >>>>>>>      */
> >>>>>>> TableEnvironment.executeSqlAsync(String statement): TableResult
> >>>>>>>
> >>>>>>> /**
> >>>>>>>     * Asynchronously execute the dml statements as a batch
> >>>>>>>     */
> >>>>>>> StatementSet.executeAsync(): TableResult
> >>>>>>>
> >>>>>>> public interface TableResult {
> >>>>>>>       /**
> >>>>>>>        * return JobClient for DQL and DML in async mode, else
> return
> >>>>>>> Optional.empty
> >>>>>>>        */
> >>>>>>>       Optional<JobClient> getJobClient();
> >>>>>>> }
> >>>>>>>
> >>>>>>> what do you think?
> >>>>>>>
> >>>>>>> Best,
> >>>>>>> Godfrey
> >>>>>>>
> >>>>>>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午9:15写道:
> >>>>>>>
> >>>>>>>> Hi Godfrey,
> >>>>>>>>
> >>>>>>>> executing streaming queries must be our top priority because this
> is
> >>>>>>>> what distinguishes Flink from competitors. If we change the
> >>>>>>>> execution
> >>>>>>>> behavior, we should think about the other cases as well to not
> break
> >>>>> the
> >>>>>>>> API a third time.
> >>>>>>>>
> >>>>>>>> I fear that just having an async execute method will not be enough
> >>>>>>>> because users should be able to mix streaming and batch queries
> in a
> >>>>>>>> unified scenario.
> >>>>>>>>
> >>>>>>>> If I remember it correctly, we had some discussions in the past
> >>>>>>>> about
> >>>>>>>> what decides about the execution mode of a query. Currently, we
> >>>>>>>> would
> >>>>>>>> like to let the query decide, not derive it from the sources.
> >>>>>>>>
> >>>>>>>> So I could image a multiline pipeline as:
> >>>>>>>>
> >>>>>>>> USE CATALOG 'mycat';
> >>>>>>>> INSERT INTO t1 SELECT * FROM s;
> >>>>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
> >>>>>>>>
> >>>>>>>> For executeMultilineSql():
> >>>>>>>>
> >>>>>>>> sync because regular SQL
> >>>>>>>> sync because regular Batch SQL
> >>>>>>>> async because Streaming SQL
> >>>>>>>>
> >>>>>>>> For executeAsyncMultilineSql():
> >>>>>>>>
> >>>>>>>> async because everything should be async
> >>>>>>>> async because everything should be async
> >>>>>>>> async because everything should be async
> >>>>>>>>
> >>>>>>>> What we should not start for executeAsyncMultilineSql():
> >>>>>>>>
> >>>>>>>> sync because DDL
> >>>>>>>> async because everything should be async
> >>>>>>>> async because everything should be async
> >>>>>>>>
> >>>>>>>> What are you thoughts here?
> >>>>>>>>
> >>>>>>>> Regards,
> >>>>>>>> Timo
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On 26.03.20 12:50, godfrey he wrote:
> >>>>>>>>> Hi Timo,
> >>>>>>>>>
> >>>>>>>>> I agree with you that streaming queries mostly need async
> >>>>>>>>> execution.
> >>>>>>>>> In fact, our original plan is only introducing sync methods in
> this
> >>>>>> FLIP,
> >>>>>>>>> and async methods (like "executeSqlAsync") will be introduced in
> >>>>>>>>> the
> >>>>>>>> future
> >>>>>>>>> which is mentioned in the appendix.
> >>>>>>>>>
> >>>>>>>>> Maybe the async methods also need to be considered in this FLIP.
> >>>>>>>>>
> >>>>>>>>> I think sync methods is also useful for streaming which can be
> used
> >>>>> to
> >>>>>>>> run
> >>>>>>>>> bounded source.
> >>>>>>>>> Maybe we should check whether all sources are bounded in sync
> >>>>> execution
> >>>>>>>>> mode.
> >>>>>>>>>
> >>>>>>>>>> Also, if we block for streaming queries, we could never support
> >>>>>>>>>> multiline files. Because the first INSERT INTO would block the
> >>>>> further
> >>>>>>>>>> execution.
> >>>>>>>>> agree with you, we need async method to submit multiline files,
> >>>>>>>>> and files should be limited that the DQL and DML should be
> >>>>>>>>> always in
> >>>>>> the
> >>>>>>>>> end for streaming.
> >>>>>>>>>
> >>>>>>>>> Best,
> >>>>>>>>> Godfrey
> >>>>>>>>>
> >>>>>>>>> Timo Walther <[hidden email]> 于2020年3月26日周四 下午4:29写道:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> Hi Godfrey,
> >>>>>>>>>>
> >>>>>>>>>> having control over the job after submission is a requirement
> that
> >>>>> was
> >>>>>>>>>> requested frequently (some examples are [1], [2]). Users would
> >>>>>>>>>> like
> >>>>> to
> >>>>>>>>>> get insights about the running or completed job. Including the
> >>>>> jobId,
> >>>>>>>>>> jobGraph etc., the JobClient summarizes these properties.
> >>>>>>>>>>
> >>>>>>>>>> It is good to have a discussion about synchronous/asynchronous
> >>>>>>>>>> submission now to have a complete execution picture.
> >>>>>>>>>>
> >>>>>>>>>> I thought we submit streaming queries mostly async and just
> >>>>>>>>>> wait for
> >>>>>> the
> >>>>>>>>>> successful submission. If we block for streaming queries, how
> >>>>>>>>>> can we
> >>>>>>>>>> collect() or print() results?
> >>>>>>>>>>
> >>>>>>>>>> Also, if we block for streaming queries, we could never support
> >>>>>>>>>> multiline files. Because the first INSERT INTO would block the
> >>>>> further
> >>>>>>>>>> execution.
> >>>>>>>>>>
> >>>>>>>>>> If we decide to block entirely on streaming queries, we need the
> >>>>> async
> >>>>>>>>>> execution methods in the design already. However, I would
> >>>>>>>>>> rather go
> >>>>>> for
> >>>>>>>>>> non-blocking streaming queries. Also with the `EMIT STREAM` key
> >>>>>>>>>> word
> >>>>>> in
> >>>>>>>>>> mind that we might add to SQL statements soon.
> >>>>>>>>>>
> >>>>>>>>>> Regards,
> >>>>>>>>>> Timo
> >>>>>>>>>>
> >>>>>>>>>> [1] https://issues.apache.org/jira/browse/FLINK-16761
> >>>>>>>>>> [2] https://issues.apache.org/jira/browse/FLINK-12214
> >>>>>>>>>>
> >>>>>>>>>> On 25.03.20 16:30, godfrey he wrote:
> >>>>>>>>>>> Hi Timo,
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks for the updating.
> >>>>>>>>>>>
> >>>>>>>>>>> Regarding to "multiline statement support", I'm also fine that
> >>>>>>>>>>> `TableEnvironment.executeSql()` only supports single line
> >>>>> statement,
> >>>>>>>> and
> >>>>>>>>>> we
> >>>>>>>>>>> can support multiline statement later (needs more discussion
> >>>>>>>>>>> about
> >>>>>>>> this).
> >>>>>>>>>>>
> >>>>>>>>>>> Regarding to "StatementSet.explian()", I don't have strong
> >>>>>>>>>>> opinions
> >>>>>>>> about
> >>>>>>>>>>> that.
> >>>>>>>>>>>
> >>>>>>>>>>> Regarding to "TableResult.getJobClient()", I think it's
> >>>>> unnecessary.
> >>>>>>>> The
> >>>>>>>>>>> reason is: first, many statements (e.g. DDL, show xx, use xx)
> >>>>>>>>>>> will
> >>>>>> not
> >>>>>>>>>>> submit a Flink job. second, `TableEnvironment.executeSql()` and
> >>>>>>>>>>> `StatementSet.execute()` are synchronous method, `TableResult`
> >>>>>>>>>>> will
> >>>>>> be
> >>>>>>>>>>> returned only after the job is finished or failed.
> >>>>>>>>>>>
> >>>>>>>>>>> Regarding to "whether StatementSet.execute() needs to throw
> >>>>>>>> exception", I
> >>>>>>>>>>> think we should choose a unified way to tell whether the
> >>>>>>>>>>> execution
> >>>>> is
> >>>>>>>>>>> successful. If `TableResult` contains ERROR kind (non-runtime
> >>>>>>>> exception),
> >>>>>>>>>>> users need to not only check the result but also catch the
> >>>>>>>>>>> runtime
> >>>>>>>>>>> exception in their code. or `StatementSet.execute()` does not
> >>>>>>>>>>> throw
> >>>>>> any
> >>>>>>>>>>> exception (including runtime exception), all exception
> >>>>>>>>>>> messages are
> >>>>>> in
> >>>>>>>>>> the
> >>>>>>>>>>> result.  I prefer "StatementSet.execute() needs to throw
> >>>>> exception".
> >>>>>> cc
> >>>>>>>>>> @Jark
> >>>>>>>>>>> Wu <[hidden email]>
> >>>>>>>>>>>
> >>>>>>>>>>> I will update the agreed parts to the document first.
> >>>>>>>>>>>
> >>>>>>>>>>> Best,
> >>>>>>>>>>> Godfrey
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> Timo Walther <[hidden email]> 于2020年3月25日周三
> >>>>>>>>>>> 下午6:51写道:
> >>>>>>>>>>>
> >>>>>>>>>>>> Hi Godfrey,
> >>>>>>>>>>>>
> >>>>>>>>>>>> thanks for starting the discussion on the mailing list. And
> >>>>>>>>>>>> sorry
> >>>>>>>> again
> >>>>>>>>>>>> for the late reply to FLIP-84. I have updated the Google doc
> one
> >>>>>> more
> >>>>>>>>>>>> time to incorporate the offline discussions.
> >>>>>>>>>>>>
> >>>>>>>>>>>>       From Dawid's and my view, it is fine to postpone the
> >>>>>>>>>>>> multiline
> >>>>>>>> support
> >>>>>>>>>>>> to a separate method. This can be future work even though we
> >>>>>>>>>>>> will
> >>>>>> need
> >>>>>>>>>>>> it rather soon.
> >>>>>>>>>>>>
> >>>>>>>>>>>> If there are no objections, I suggest to update the FLIP-84
> >>>>>>>>>>>> again
> >>>>>> and
> >>>>>>>>>>>> have another voting process.
> >>>>>>>>>>>>
> >>>>>>>>>>>> Thanks,
> >>>>>>>>>>>> Timo
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>> On 25.03.20 11:17, godfrey he wrote:
> >>>>>>>>>>>>> Hi community,
> >>>>>>>>>>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1].
> >>>>>>>>>>>>> The
> >>>>>>>>>>>> feedbacks
> >>>>>>>>>>>>> are all about new introduced methods. We had a discussion
> >>>>>> yesterday,
> >>>>>>>>>> and
> >>>>>>>>>>>>> most of feedbacks have been agreed upon. Here is the
> >>>>>>>>>>>>> conclusions:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> *1. about proposed methods in `TableEnvironment`:*
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> the original proposed methods:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> TableEnvironment.createDmlBatch(): DmlBatch
> >>>>>>>>>>>>> TableEnvironment.executeStatement(String statement):
> >>>>>>>>>>>>> ResultTable
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> the new proposed methods:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> // we should not use abbreviations in the API, and the term
> >>>>> "Batch"
> >>>>>>>> is
> >>>>>>>>>>>>> easily confused with batch/streaming processing
> >>>>>>>>>>>>> TableEnvironment.createStatementSet(): StatementSet
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> // every method that takes SQL should have `Sql` in its name
> >>>>>>>>>>>>> // supports multiline statement ???
> >>>>>>>>>>>>> TableEnvironment.executeSql(String statement): TableResult
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> // new methods. supports explaining DQL and DML
> >>>>>>>>>>>>> TableEnvironment.explainSql(String statement,
> ExplainDetail...
> >>>>>>>>>> details):
> >>>>>>>>>>>>> String
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> *2. about proposed related classes:*
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> the original proposed classes:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> interface DmlBatch {
> >>>>>>>>>>>>>           void addInsert(String insert);
> >>>>>>>>>>>>>           void addInsert(String targetPath, Table table);
> >>>>>>>>>>>>>           ResultTable execute() throws Exception ;
> >>>>>>>>>>>>>           String explain(boolean extended);
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> public interface ResultTable {
> >>>>>>>>>>>>>           TableSchema getResultSchema();
> >>>>>>>>>>>>>           Iterable<Row> getResultRows();
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> the new proposed classes:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> interface StatementSet {
> >>>>>>>>>>>>>           // every method that takes SQL should have `Sql` in
> >>>>>>>>>>>>> its
> >>>>>> name
> >>>>>>>>>>>>>           // return StatementSet instance for fluent
> programming
> >>>>>>>>>>>>>           addInsertSql(String statement): StatementSet
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>           // return StatementSet instance for fluent
> programming
> >>>>>>>>>>>>>           addInsert(String tablePath, Table table):
> StatementSet
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>           // new method. support overwrite mode
> >>>>>>>>>>>>>           addInsert(String tablePath, Table table, boolean
> >>>>>> overwrite):
> >>>>>>>>>>>>> StatementSet
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>           explain(): String
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>           // new method. supports adding more details for the
> >>>>> result
> >>>>>>>>>>>>>           explain(ExplainDetail... extraDetails): String
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>           // throw exception ???
> >>>>>>>>>>>>>           execute(): TableResult
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> interface TableResult {
> >>>>>>>>>>>>>           getTableSchema(): TableSchema
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>           // avoid custom parsing of an "OK" row in
> programming
> >>>>>>>>>>>>>           getResultKind(): ResultKind
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>           // instead of `get` make it explicit that this is
> >>>>>>>>>>>>> might
> >>>>> be
> >>>>>>>>>>>> triggering
> >>>>>>>>>>>>> an expensive operation
> >>>>>>>>>>>>>           collect(): Iterable<Row>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>           // for fluent programming
> >>>>>>>>>>>>>           print(): Unit
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> enum ResultKind {
> >>>>>>>>>>>>>           SUCCESS, // for DDL, DCL and statements with a
> simple
> >>>>> "OK"
> >>>>>>>>>>>>>           SUCCESS_WITH_CONTENT, // rows with important
> >>>>>>>>>>>>> content are
> >>>>>>>>>> available
> >>>>>>>>>>>>> (DML, DQL)
> >>>>>>>>>>>>> }
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> *3. new proposed methods in `Table`*
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> `Table.insertInto()` will be deprecated, and the following
> >>>>> methods
> >>>>>>>> are
> >>>>>>>>>>>>> introduced:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Table.executeInsert(String tablePath): TableResult
> >>>>>>>>>>>>> Table.executeInsert(String tablePath, boolean overwrite):
> >>>>>> TableResult
> >>>>>>>>>>>>> Table.explain(ExplainDetail... details): String
> >>>>>>>>>>>>> Table.execute(): TableResult
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> There are two issues need further discussion, one is whether
> >>>>>>>>>>>>> `TableEnvironment.executeSql(String statement): TableResult`
> >>>>> needs
> >>>>>> to
> >>>>>>>>>>>>> support multiline statement (or whether `TableEnvironment`
> >>>>>>>>>>>>> needs
> >>>>> to
> >>>>>>>>>>>> support
> >>>>>>>>>>>>> multiline statement), and another one is whether
> >>>>>>>>>> `StatementSet.execute()`
> >>>>>>>>>>>>> needs to throw exception.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> please refer to the feedback document [2] for the details.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Any suggestions are warmly welcomed!
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> [1]
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>
> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
> >>>>>
> >>>>>>>>>>>>> [2]
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>
> >>>>>>
> >>>>>
> >>
> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
> >>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Best,
> >>>>>>>>>>>>> Godfrey
> >>>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>
> >>
> >
>
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

dwysakowicz

When considering the multi-line support I think it is helpful to start with a use case in mind. In my opinion consumers of this method will be:

  1. sql-client
  2. third-part sql based platforms

@Godfrey As for the quit/source/... commands. I think those belong to the responsibility of aforementioned. I think they should not be understandable by the TableEnvironment. What would quit on a TableEnvironment do? Moreover I think such commands should be prefixed appropriately. I think it's a common practice to e.g. prefix those with ! or : to say they are meta commands of the tool rather than a query.

I also don't necessarily understand why platform users need to know the kind of the query to use the proposed method. They should get the type from the TableResult#ResultKind. If the ResultKind is SUCCESS, it was a DCL/DDL. If SUCCESS_WITH_CONTENT it was a DML/DQL. If that's not enough we can enrich the TableResult with more explicit kind of query, but so far I don't see such a need.

@Kurt In those cases I would assume the developers want to present results of the queries anyway. Moreover I think it is safe to assume they can adhere to such a contract that the results must be iterated.

For direct users of TableEnvironment/Table API this method does not make much sense anyway, in my opinion. I think we can rather safely assume in this scenario they do not want to submit multiple queries at a single time.

Best,

Dawid


On 01/04/2020 15:07, Kurt Young wrote:
One comment to `executeMultilineSql`, I'm afraid sometimes user might
forget to
iterate the returned iterators, e.g. user submits a bunch of DDLs and
expect the
framework will execute them one by one. But it didn't.

Best,
Kurt


On Wed, Apr 1, 2020 at 5:10 PM Aljoscha Krettek [hidden email] wrote:

Agreed to what Dawid and Timo said.

To answer your question about multi line SQL: no, we don't think we need
this in Flink 1.11, we only wanted to make sure that the interfaces that
we now put in place will potentially allow this in the future.

Best,
Aljoscha

On 01.04.20 09:31, godfrey he wrote:
Hi, Timo & Dawid,

Thanks so much for the effort of `multiline statements supporting`,
I have a few questions about this method:

1. users can well control the execution logic through the proposed method
  if they know what the statements are (a statement is a DDL, a DML or
others).
but if a statement is from a file, that means users do not know what the
statements are,
the execution behavior is unclear.
As a platform user, I think this method is hard to use, unless the
platform
defines
a set of rule about the statements order, such as: no select in the
middle,
dml must be at tail of sql file (which may be the most case in product
env).
Otherwise the platform must parse the sql first, then know what the
statements are.
If do like that, the platform can handle all cases through `executeSql`
and
`StatementSet`.

2. SQL client can't also use `executeMultilineSql` to supports multiline
statements,
  because there are some special commands introduced in SQL client,
such as `quit`, `source`, `load jar` (not exist now, but maybe we need
this
command
  to support dynamic table source and udf).
Does TableEnvironment also supports those commands?

3. btw, we must have this feature in release-1.11? I find there are few
user cases
  in the feedback document which behavior is unclear now.

regarding to "change the return value from `Iterable<Row` to
`Iterator<Row`",
I couldn't agree more with this change. Just as Dawid mentioned
"The contract of the Iterable#iterator is that it returns a new iterator
each time,
  which effectively means we can iterate the results multiple times.",
we does not provide iterate the results multiple times.
If we want do that, the client must buffer all results. but it's
impossible
for streaming job.

Best,
Godfrey

Dawid Wysakowicz [hidden email] 于2020年4月1日周三 上午3:14写道:

Thank you Timo for the great summary! It covers (almost) all the topics.
Even though in the end we are not suggesting much changes to the current
state of FLIP I think it is important to lay out all possible use cases
so that we do not change the execution model every release.

There is one additional thing we discussed. Could we change the result
type of TableResult#collect to Iterator<Row>? Even though those
interfaces do not differ much. I think Iterator better describes that
the results might not be materialized on the client side, but can be
retrieved on a per record basis. The contract of the Iterable#iterator
is that it returns a new iterator each time, which effectively means we
can iterate the results multiple times. Iterating the results is not
possible when we don't retrieve all the results from the cluster at
once.
I think we should also use Iterator for
TableEnvironment#executeMultilineSql(String statements):
Iterator<TableResult>.

Best,

Dawid

On 31/03/2020 19:27, Timo Walther wrote:
Hi Godfrey,

Aljoscha, Dawid, Klou, and I had another discussion around FLIP-84. In
particular, we discussed how the current status of the FLIP and the
future requirements around multiline statements, async/sync, collect()
fit together.

We also updated the FLIP-84 Feedback Summary document [1] with some
use cases.

We believe that we found a good solution that also fits to what is in
the current FLIP. So no bigger changes necessary, which is great!

Our findings were:

1. Async vs sync submission of Flink jobs:

Having a blocking `execute()` in DataStream API was rather a mistake.
Instead all submissions should be async because this allows supporting
both modes if necessary. Thus, submitting all queries async sounds
good to us. If users want to run a job sync, they can use the
JobClient and wait for completion (or collect() in case of batch jobs).

2. Multi-statement execution:

For the multi-statement execution, we don't see a contradication with
the async execution behavior. We imagine a method like:

TableEnvironment#executeMultilineSql(String statements):
Iterable<TableResult>

Where the `Iterator#next()` method would trigger the next statement
submission. This allows a caller to decide synchronously when to
submit statements async to the cluster. Thus, a service such as the
SQL Client can handle the result of each statement individually and
process statement by statement sequentially.

3. The role of TableResult and result retrieval in general

`TableResult` is similar to `JobClient`. Instead of returning a
`CompletableFuture` of something, it is a concrete util class where
some methods have the behavior of completable future (e.g. collect(),
print()) and some are already completed (getTableSchema(),
getResultKind()).

`StatementSet#execute()` returns a single `TableResult` because the
order is undefined in a set and all statements have the same schema.
Its `collect()` will return a row for each executed `INSERT INTO` in
the order of statement definition.

For simple `SELECT * FROM ...`, the query execution might block until
`collect()` is called to pull buffered rows from the job (from
socket/REST API what ever we will use in the future). We can say that
a statement finished successfully, when the `collect#Iterator#hasNext`
has returned false.

I hope this summarizes our discussion @Dawid/Aljoscha/Klou?

It would be great if we can add these findings to the FLIP before we
start voting.

One minor thing: some `execute()` methods still throw a checked
exception; can we remove that from the FLIP? Also the above mentioned
`Iterator#next()` would trigger an execution without throwing a
checked exception.

Thanks,
Timo

[1]


          
https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit#
On 31.03.20 06:28, godfrey he wrote:
Hi, Timo & Jark

Thanks for your explanation.
Agree with you that async execution should always be async,
and sync execution scenario can be covered  by async execution.
It helps provide an unified entry point for batch and streaming.
I think we can also use sync execution for some testing.
So, I agree with you that we provide `executeSql` method and it's
async
method.
If we want sync method in the future, we can add method named
`executeSqlSync`.

I think we've reached an agreement. I will update the document, and
start
voting process.

Best,
Godfrey


Jark Wu [hidden email] 于2020年3月31日周二 上午12:46写道:

Hi,

I didn't follow the full discussion.
But I share the same concern with Timo that streaming queries should
always
be async.
Otherwise, I can image it will cause a lot of confusion and problems
if
users don't deeply keep the "sync" in mind (e.g. client hangs).
Besides, the streaming mode is still the majority use cases of Flink
and
Flink SQL. We should put the usability at a high priority.

Best,
Jark


On Mon, 30 Mar 2020 at 23:27, Timo Walther [hidden email]
wrote:

                  
Hi Godfrey,

maybe I wasn't expressing my biggest concern enough in my last mail.
Even in a singleline and sync execution, I think that streaming
queries
should not block the execution. Otherwise it is not possible to call
collect() or print() on them afterwards.

"there are too many things need to discuss for multiline":

True, I don't want to solve all of them right now. But what I know
is
that our newly introduced methods should fit into a multiline
execution.
There is no big difference of calling `executeSql(A),
executeSql(B)` and
processing a multiline file `A;\nB;`.

I think the example that you mentioned can simply be undefined for
now.
Currently, no catalog is modifying data but just metadata. This is a
separate discussion.

"result of the second statement is indeterministic":

Sure this is indeterministic. But this is the implementers fault
and we
cannot forbid such pipelines.

How about we always execute streaming queries async? It would
unblock
executeSql() and multiline statements.

Having a `executeSqlAsync()` is useful for batch. However, I don't
want
`sync/async` be the new batch/stream flag. The execution behavior
should
come from the query itself.

Regards,
Timo


On 30.03.20 11:12, godfrey he wrote:
Hi Timo,

Agree with you that streaming queries is our top priority,
but I think there are too many things need to discuss for multiline
statements:
e.g.
1. what's the behaivor of DDL and DML mixing for async execution:
create table t1 xxx;
create table t2 xxx;
insert into t2 select * from t1 where xxx;
drop table t1; // t1 may be a MySQL table, the data will also be
deleted.
t1 is dropped when "insert" job is running.

2. what's the behaivor of unified scenario for async execution:
(as you
mentioned)
INSERT INTO t1 SELECT * FROM s;
INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;

The result of the second statement is indeterministic, because the
first
statement maybe is running.
I think we need to put a lot of effort to define the behavior of
logically
related queries.

In this FLIP, I suggest we only handle single statement, and we
also
introduce an async execute method
which is more important and more often used for users.

Dor the sync methods (like `TableEnvironment.executeSql` and
`StatementSet.execute`),
the result will be returned until the job is finished. The
following
methods will be introduced in this FLIP:

    /**
     * Asynchronously execute the given single statement
     */
TableEnvironment.executeSqlAsync(String statement): TableResult

/**
    * Asynchronously execute the dml statements as a batch
    */
StatementSet.executeAsync(): TableResult

public interface TableResult {
      /**
       * return JobClient for DQL and DML in async mode, else
return
Optional.empty
       */
      Optional<JobClient> getJobClient();
}

what do you think?

Best,
Godfrey

Timo Walther [hidden email] 于2020年3月26日周四 下午9:15写道:

Hi Godfrey,

executing streaming queries must be our top priority because this
is
what distinguishes Flink from competitors. If we change the
execution
behavior, we should think about the other cases as well to not
break
the
API a third time.

I fear that just having an async execute method will not be enough
because users should be able to mix streaming and batch queries
in a
unified scenario.

If I remember it correctly, we had some discussions in the past
about
what decides about the execution mode of a query. Currently, we
would
like to let the query decide, not derive it from the sources.

So I could image a multiline pipeline as:

USE CATALOG 'mycat';
INSERT INTO t1 SELECT * FROM s;
INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;

For executeMultilineSql():

sync because regular SQL
sync because regular Batch SQL
async because Streaming SQL

For executeAsyncMultilineSql():

async because everything should be async
async because everything should be async
async because everything should be async

What we should not start for executeAsyncMultilineSql():

sync because DDL
async because everything should be async
async because everything should be async

What are you thoughts here?

Regards,
Timo


On 26.03.20 12:50, godfrey he wrote:
Hi Timo,

I agree with you that streaming queries mostly need async
execution.
In fact, our original plan is only introducing sync methods in
this
FLIP,
and async methods (like "executeSqlAsync") will be introduced in
the
future
which is mentioned in the appendix.

Maybe the async methods also need to be considered in this FLIP.

I think sync methods is also useful for streaming which can be
used
to
run
bounded source.
Maybe we should check whether all sources are bounded in sync
execution
mode.

Also, if we block for streaming queries, we could never support
multiline files. Because the first INSERT INTO would block the
further
execution.
agree with you, we need async method to submit multiline files,
and files should be limited that the DQL and DML should be
always in
the
end for streaming.

Best,
Godfrey

Timo Walther [hidden email] 于2020年3月26日周四 下午4:29写道:


Hi Godfrey,

having control over the job after submission is a requirement
that
was
requested frequently (some examples are [1], [2]). Users would
like
to
get insights about the running or completed job. Including the
jobId,
jobGraph etc., the JobClient summarizes these properties.

It is good to have a discussion about synchronous/asynchronous
submission now to have a complete execution picture.

I thought we submit streaming queries mostly async and just
wait for
the
successful submission. If we block for streaming queries, how
can we
collect() or print() results?

Also, if we block for streaming queries, we could never support
multiline files. Because the first INSERT INTO would block the
further
execution.

If we decide to block entirely on streaming queries, we need the
async
execution methods in the design already. However, I would
rather go
for
non-blocking streaming queries. Also with the `EMIT STREAM` key
word
in
mind that we might add to SQL statements soon.

Regards,
Timo

[1] https://issues.apache.org/jira/browse/FLINK-16761
[2] https://issues.apache.org/jira/browse/FLINK-12214

On 25.03.20 16:30, godfrey he wrote:
Hi Timo,

Thanks for the updating.

Regarding to "multiline statement support", I'm also fine that
`TableEnvironment.executeSql()` only supports single line
statement,
and
we
can support multiline statement later (needs more discussion
about
this).
Regarding to "StatementSet.explian()", I don't have strong
opinions
about
that.

Regarding to "TableResult.getJobClient()", I think it's
unnecessary.
The
reason is: first, many statements (e.g. DDL, show xx, use xx)
will
not
submit a Flink job. second, `TableEnvironment.executeSql()` and
`StatementSet.execute()` are synchronous method, `TableResult`
will
be
returned only after the job is finished or failed.

Regarding to "whether StatementSet.execute() needs to throw
exception", I
think we should choose a unified way to tell whether the
execution
is
successful. If `TableResult` contains ERROR kind (non-runtime
exception),
users need to not only check the result but also catch the
runtime
exception in their code. or `StatementSet.execute()` does not
throw
any
exception (including runtime exception), all exception
messages are
in
the
result.  I prefer "StatementSet.execute() needs to throw
exception".
cc
@Jark
Wu [hidden email]

I will update the agreed parts to the document first.

Best,
Godfrey


Timo Walther [hidden email] 于2020年3月25日周三
下午6:51写道:

Hi Godfrey,

thanks for starting the discussion on the mailing list. And
sorry
again
for the late reply to FLIP-84. I have updated the Google doc
one
more
time to incorporate the offline discussions.

      From Dawid's and my view, it is fine to postpone the
multiline
support
to a separate method. This can be future work even though we
will
need
it rather soon.

If there are no objections, I suggest to update the FLIP-84
again
and
have another voting process.

Thanks,
Timo


On 25.03.20 11:17, godfrey he wrote:
Hi community,
Timo, Fabian and Dawid have some feedbacks about FLIP-84[1].
The
feedbacks
are all about new introduced methods. We had a discussion
yesterday,
and
most of feedbacks have been agreed upon. Here is the
conclusions:

*1. about proposed methods in `TableEnvironment`:*

the original proposed methods:

TableEnvironment.createDmlBatch(): DmlBatch
TableEnvironment.executeStatement(String statement):
ResultTable

the new proposed methods:

// we should not use abbreviations in the API, and the term
"Batch"
is
easily confused with batch/streaming processing
TableEnvironment.createStatementSet(): StatementSet

// every method that takes SQL should have `Sql` in its name
// supports multiline statement ???
TableEnvironment.executeSql(String statement): TableResult

// new methods. supports explaining DQL and DML
TableEnvironment.explainSql(String statement,
ExplainDetail...
details):
String


*2. about proposed related classes:*

the original proposed classes:

interface DmlBatch {
          void addInsert(String insert);
          void addInsert(String targetPath, Table table);
          ResultTable execute() throws Exception ;
          String explain(boolean extended);
}

public interface ResultTable {
          TableSchema getResultSchema();
          Iterable<Row> getResultRows();
}

the new proposed classes:

interface StatementSet {
          // every method that takes SQL should have `Sql` in
its
name
          // return StatementSet instance for fluent
programming
          addInsertSql(String statement): StatementSet

          // return StatementSet instance for fluent
programming
          addInsert(String tablePath, Table table):
StatementSet
          // new method. support overwrite mode
          addInsert(String tablePath, Table table, boolean
overwrite):
StatementSet

          explain(): String

          // new method. supports adding more details for the
result
          explain(ExplainDetail... extraDetails): String

          // throw exception ???
          execute(): TableResult
}

interface TableResult {
          getTableSchema(): TableSchema

          // avoid custom parsing of an "OK" row in
programming
          getResultKind(): ResultKind

          // instead of `get` make it explicit that this is
might
be
triggering
an expensive operation
          collect(): Iterable<Row>

          // for fluent programming
          print(): Unit
}

enum ResultKind {
          SUCCESS, // for DDL, DCL and statements with a
simple
"OK"
          SUCCESS_WITH_CONTENT, // rows with important
content are
available
(DML, DQL)
}


*3. new proposed methods in `Table`*

`Table.insertInto()` will be deprecated, and the following
methods
are
introduced:

Table.executeInsert(String tablePath): TableResult
Table.executeInsert(String tablePath, boolean overwrite):
TableResult
Table.explain(ExplainDetail... details): String
Table.execute(): TableResult

There are two issues need further discussion, one is whether
`TableEnvironment.executeSql(String statement): TableResult`
needs
to
support multiline statement (or whether `TableEnvironment`
needs
to
support
multiline statement), and another one is whether
`StatementSet.execute()`
needs to throw exception.

please refer to the feedback document [2] for the details.

Any suggestions are warmly welcomed!

[1]


                              

                          

                      

                  

                

          
https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878

                  
[2]


                              

                          

                      

                  

                

          
https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit

                  
Best,
Godfrey



                            


                        


                    


                

              

            


        

      

    

signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] FLIP-84 Feedback Summary

Timo Walther-2
Hi Godfrey,

first of all, I agree with Dawid. The multiline story is not completed
by this FLIP. It just verifies the big picture.

1. "control the execution logic through the proposed method if they know
what the statements are"

This is a good point that also Fabian raised in the linked google doc. I
could also imagine to return a more complicated POJO when calling
`executeMultiSql()`.

The POJO would include some `getSqlProperties()` such that a platform
gets insights into the query before executing. We could also trigger the
execution more explicitly instead of hiding it behind an iterator.

2. "there are some special commands introduced in SQL client"

For platforms and SQL Client specific commands, we could offer a hook to
the parser or a fallback parser in case the regular table environment
parser cannot deal with the statement.

However, all of that is future work and can be discussed in a separate FLIP.

3. +1 for the `Iterator` instead of `Iterable`.

4. "we should convert the checked exception to unchecked exception"

Yes, I meant using a runtime exception instead of a checked exception.
There was no consensus on putting the exception into the `TableResult`.

Regards,
Timo

On 01.04.20 15:35, Dawid Wysakowicz wrote:

> When considering the multi-line support I think it is helpful to start
> with a use case in mind. In my opinion consumers of this method will be:
>
>  1. sql-client
>  2. third-part sql based platforms
>
> @Godfrey As for the quit/source/... commands. I think those belong to
> the responsibility of aforementioned. I think they should not be
> understandable by the TableEnvironment. What would quit on a
> TableEnvironment do? Moreover I think such commands should be prefixed
> appropriately. I think it's a common practice to e.g. prefix those with
> ! or : to say they are meta commands of the tool rather than a query.
>
> I also don't necessarily understand why platform users need to know the
> kind of the query to use the proposed method. They should get the type
> from the TableResult#ResultKind. If the ResultKind is SUCCESS, it was a
> DCL/DDL. If SUCCESS_WITH_CONTENT it was a DML/DQL. If that's not enough
> we can enrich the TableResult with more explicit kind of query, but so
> far I don't see such a need.
>
> @Kurt In those cases I would assume the developers want to present
> results of the queries anyway. Moreover I think it is safe to assume
> they can adhere to such a contract that the results must be iterated.
>
> For direct users of TableEnvironment/Table API this method does not make
> much sense anyway, in my opinion. I think we can rather safely assume in
> this scenario they do not want to submit multiple queries at a single time.
>
> Best,
>
> Dawid
>
>
> On 01/04/2020 15:07, Kurt Young wrote:
>> One comment to `executeMultilineSql`, I'm afraid sometimes user might
>> forget to
>> iterate the returned iterators, e.g. user submits a bunch of DDLs and
>> expect the
>> framework will execute them one by one. But it didn't.
>>
>> Best,
>> Kurt
>>
>>
>> On Wed, Apr 1, 2020 at 5:10 PM Aljoscha Krettek<[hidden email]>  wrote:
>>
>>> Agreed to what Dawid and Timo said.
>>>
>>> To answer your question about multi line SQL: no, we don't think we need
>>> this in Flink 1.11, we only wanted to make sure that the interfaces that
>>> we now put in place will potentially allow this in the future.
>>>
>>> Best,
>>> Aljoscha
>>>
>>> On 01.04.20 09:31, godfrey he wrote:
>>>> Hi, Timo & Dawid,
>>>>
>>>> Thanks so much for the effort of `multiline statements supporting`,
>>>> I have a few questions about this method:
>>>>
>>>> 1. users can well control the execution logic through the proposed method
>>>>    if they know what the statements are (a statement is a DDL, a DML or
>>>> others).
>>>> but if a statement is from a file, that means users do not know what the
>>>> statements are,
>>>> the execution behavior is unclear.
>>>> As a platform user, I think this method is hard to use, unless the
>>> platform
>>>> defines
>>>> a set of rule about the statements order, such as: no select in the
>>> middle,
>>>> dml must be at tail of sql file (which may be the most case in product
>>>> env).
>>>> Otherwise the platform must parse the sql first, then know what the
>>>> statements are.
>>>> If do like that, the platform can handle all cases through `executeSql`
>>> and
>>>> `StatementSet`.
>>>>
>>>> 2. SQL client can't also use `executeMultilineSql` to supports multiline
>>>> statements,
>>>>    because there are some special commands introduced in SQL client,
>>>> such as `quit`, `source`, `load jar` (not exist now, but maybe we need
>>> this
>>>> command
>>>>    to support dynamic table source and udf).
>>>> Does TableEnvironment also supports those commands?
>>>>
>>>> 3. btw, we must have this feature in release-1.11? I find there are few
>>>> user cases
>>>>    in the feedback document which behavior is unclear now.
>>>>
>>>> regarding to "change the return value from `Iterable<Row` to
>>>> `Iterator<Row`",
>>>> I couldn't agree more with this change. Just as Dawid mentioned
>>>> "The contract of the Iterable#iterator is that it returns a new iterator
>>>> each time,
>>>>    which effectively means we can iterate the results multiple times.",
>>>> we does not provide iterate the results multiple times.
>>>> If we want do that, the client must buffer all results. but it's
>>> impossible
>>>> for streaming job.
>>>>
>>>> Best,
>>>> Godfrey
>>>>
>>>> Dawid Wysakowicz<[hidden email]>  于2020年4月1日周三 上午3:14写道:
>>>>
>>>>> Thank you Timo for the great summary! It covers (almost) all the topics.
>>>>> Even though in the end we are not suggesting much changes to the current
>>>>> state of FLIP I think it is important to lay out all possible use cases
>>>>> so that we do not change the execution model every release.
>>>>>
>>>>> There is one additional thing we discussed. Could we change the result
>>>>> type of TableResult#collect to Iterator<Row>? Even though those
>>>>> interfaces do not differ much. I think Iterator better describes that
>>>>> the results might not be materialized on the client side, but can be
>>>>> retrieved on a per record basis. The contract of the Iterable#iterator
>>>>> is that it returns a new iterator each time, which effectively means we
>>>>> can iterate the results multiple times. Iterating the results is not
>>>>> possible when we don't retrieve all the results from the cluster at
>>> once.
>>>>> I think we should also use Iterator for
>>>>> TableEnvironment#executeMultilineSql(String statements):
>>>>> Iterator<TableResult>.
>>>>>
>>>>> Best,
>>>>>
>>>>> Dawid
>>>>>
>>>>> On 31/03/2020 19:27, Timo Walther wrote:
>>>>>> Hi Godfrey,
>>>>>>
>>>>>> Aljoscha, Dawid, Klou, and I had another discussion around FLIP-84. In
>>>>>> particular, we discussed how the current status of the FLIP and the
>>>>>> future requirements around multiline statements, async/sync, collect()
>>>>>> fit together.
>>>>>>
>>>>>> We also updated the FLIP-84 Feedback Summary document [1] with some
>>>>>> use cases.
>>>>>>
>>>>>> We believe that we found a good solution that also fits to what is in
>>>>>> the current FLIP. So no bigger changes necessary, which is great!
>>>>>>
>>>>>> Our findings were:
>>>>>>
>>>>>> 1. Async vs sync submission of Flink jobs:
>>>>>>
>>>>>> Having a blocking `execute()` in DataStream API was rather a mistake.
>>>>>> Instead all submissions should be async because this allows supporting
>>>>>> both modes if necessary. Thus, submitting all queries async sounds
>>>>>> good to us. If users want to run a job sync, they can use the
>>>>>> JobClient and wait for completion (or collect() in case of batch jobs).
>>>>>>
>>>>>> 2. Multi-statement execution:
>>>>>>
>>>>>> For the multi-statement execution, we don't see a contradication with
>>>>>> the async execution behavior. We imagine a method like:
>>>>>>
>>>>>> TableEnvironment#executeMultilineSql(String statements):
>>>>>> Iterable<TableResult>
>>>>>>
>>>>>> Where the `Iterator#next()` method would trigger the next statement
>>>>>> submission. This allows a caller to decide synchronously when to
>>>>>> submit statements async to the cluster. Thus, a service such as the
>>>>>> SQL Client can handle the result of each statement individually and
>>>>>> process statement by statement sequentially.
>>>>>>
>>>>>> 3. The role of TableResult and result retrieval in general
>>>>>>
>>>>>> `TableResult` is similar to `JobClient`. Instead of returning a
>>>>>> `CompletableFuture` of something, it is a concrete util class where
>>>>>> some methods have the behavior of completable future (e.g. collect(),
>>>>>> print()) and some are already completed (getTableSchema(),
>>>>>> getResultKind()).
>>>>>>
>>>>>> `StatementSet#execute()` returns a single `TableResult` because the
>>>>>> order is undefined in a set and all statements have the same schema.
>>>>>> Its `collect()` will return a row for each executed `INSERT INTO` in
>>>>>> the order of statement definition.
>>>>>>
>>>>>> For simple `SELECT * FROM ...`, the query execution might block until
>>>>>> `collect()` is called to pull buffered rows from the job (from
>>>>>> socket/REST API what ever we will use in the future). We can say that
>>>>>> a statement finished successfully, when the `collect#Iterator#hasNext`
>>>>>> has returned false.
>>>>>>
>>>>>> I hope this summarizes our discussion @Dawid/Aljoscha/Klou?
>>>>>>
>>>>>> It would be great if we can add these findings to the FLIP before we
>>>>>> start voting.
>>>>>>
>>>>>> One minor thing: some `execute()` methods still throw a checked
>>>>>> exception; can we remove that from the FLIP? Also the above mentioned
>>>>>> `Iterator#next()` would trigger an execution without throwing a
>>>>>> checked exception.
>>>>>>
>>>>>> Thanks,
>>>>>> Timo
>>>>>>
>>>>>> [1]
>>>>>>
>>> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit#
>>>>>> On 31.03.20 06:28, godfrey he wrote:
>>>>>>> Hi, Timo & Jark
>>>>>>>
>>>>>>> Thanks for your explanation.
>>>>>>> Agree with you that async execution should always be async,
>>>>>>> and sync execution scenario can be covered  by async execution.
>>>>>>> It helps provide an unified entry point for batch and streaming.
>>>>>>> I think we can also use sync execution for some testing.
>>>>>>> So, I agree with you that we provide `executeSql` method and it's
>>> async
>>>>>>> method.
>>>>>>> If we want sync method in the future, we can add method named
>>>>>>> `executeSqlSync`.
>>>>>>>
>>>>>>> I think we've reached an agreement. I will update the document, and
>>>>>>> start
>>>>>>> voting process.
>>>>>>>
>>>>>>> Best,
>>>>>>> Godfrey
>>>>>>>
>>>>>>>
>>>>>>> Jark Wu<[hidden email]>  于2020年3月31日周二 上午12:46写道:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> I didn't follow the full discussion.
>>>>>>>> But I share the same concern with Timo that streaming queries should
>>>>>>>> always
>>>>>>>> be async.
>>>>>>>> Otherwise, I can image it will cause a lot of confusion and problems
>>> if
>>>>>>>> users don't deeply keep the "sync" in mind (e.g. client hangs).
>>>>>>>> Besides, the streaming mode is still the majority use cases of Flink
>>>>>>>> and
>>>>>>>> Flink SQL. We should put the usability at a high priority.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Jark
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, 30 Mar 2020 at 23:27, Timo Walther<[hidden email]>
>>> wrote:
>>>>>>>>> Hi Godfrey,
>>>>>>>>>
>>>>>>>>> maybe I wasn't expressing my biggest concern enough in my last mail.
>>>>>>>>> Even in a singleline and sync execution, I think that streaming
>>>>>>>>> queries
>>>>>>>>> should not block the execution. Otherwise it is not possible to call
>>>>>>>>> collect() or print() on them afterwards.
>>>>>>>>>
>>>>>>>>> "there are too many things need to discuss for multiline":
>>>>>>>>>
>>>>>>>>> True, I don't want to solve all of them right now. But what I know
>>> is
>>>>>>>>> that our newly introduced methods should fit into a multiline
>>>>>>>>> execution.
>>>>>>>>> There is no big difference of calling `executeSql(A),
>>>>>>>>> executeSql(B)` and
>>>>>>>>> processing a multiline file `A;\nB;`.
>>>>>>>>>
>>>>>>>>> I think the example that you mentioned can simply be undefined for
>>>>>>>>> now.
>>>>>>>>> Currently, no catalog is modifying data but just metadata. This is a
>>>>>>>>> separate discussion.
>>>>>>>>>
>>>>>>>>> "result of the second statement is indeterministic":
>>>>>>>>>
>>>>>>>>> Sure this is indeterministic. But this is the implementers fault
>>>>>>>>> and we
>>>>>>>>> cannot forbid such pipelines.
>>>>>>>>>
>>>>>>>>> How about we always execute streaming queries async? It would
>>> unblock
>>>>>>>>> executeSql() and multiline statements.
>>>>>>>>>
>>>>>>>>> Having a `executeSqlAsync()` is useful for batch. However, I don't
>>>>>>>>> want
>>>>>>>>> `sync/async` be the new batch/stream flag. The execution behavior
>>>>>>>>> should
>>>>>>>>> come from the query itself.
>>>>>>>>>
>>>>>>>>> Regards,
>>>>>>>>> Timo
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 30.03.20 11:12, godfrey he wrote:
>>>>>>>>>> Hi Timo,
>>>>>>>>>>
>>>>>>>>>> Agree with you that streaming queries is our top priority,
>>>>>>>>>> but I think there are too many things need to discuss for multiline
>>>>>>>>>> statements:
>>>>>>>>>> e.g.
>>>>>>>>>> 1. what's the behaivor of DDL and DML mixing for async execution:
>>>>>>>>>> create table t1 xxx;
>>>>>>>>>> create table t2 xxx;
>>>>>>>>>> insert into t2 select * from t1 where xxx;
>>>>>>>>>> drop table t1; // t1 may be a MySQL table, the data will also be
>>>>>>>> deleted.
>>>>>>>>>> t1 is dropped when "insert" job is running.
>>>>>>>>>>
>>>>>>>>>> 2. what's the behaivor of unified scenario for async execution:
>>>>>>>>>> (as you
>>>>>>>>>> mentioned)
>>>>>>>>>> INSERT INTO t1 SELECT * FROM s;
>>>>>>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
>>>>>>>>>>
>>>>>>>>>> The result of the second statement is indeterministic, because the
>>>>>>>> first
>>>>>>>>>> statement maybe is running.
>>>>>>>>>> I think we need to put a lot of effort to define the behavior of
>>>>>>>>> logically
>>>>>>>>>> related queries.
>>>>>>>>>>
>>>>>>>>>> In this FLIP, I suggest we only handle single statement, and we
>>> also
>>>>>>>>>> introduce an async execute method
>>>>>>>>>> which is more important and more often used for users.
>>>>>>>>>>
>>>>>>>>>> Dor the sync methods (like `TableEnvironment.executeSql` and
>>>>>>>>>> `StatementSet.execute`),
>>>>>>>>>> the result will be returned until the job is finished. The
>>> following
>>>>>>>>>> methods will be introduced in this FLIP:
>>>>>>>>>>
>>>>>>>>>>      /**
>>>>>>>>>>       * Asynchronously execute the given single statement
>>>>>>>>>>       */
>>>>>>>>>> TableEnvironment.executeSqlAsync(String statement): TableResult
>>>>>>>>>>
>>>>>>>>>> /**
>>>>>>>>>>      * Asynchronously execute the dml statements as a batch
>>>>>>>>>>      */
>>>>>>>>>> StatementSet.executeAsync(): TableResult
>>>>>>>>>>
>>>>>>>>>> public interface TableResult {
>>>>>>>>>>        /**
>>>>>>>>>>         * return JobClient for DQL and DML in async mode, else
>>> return
>>>>>>>>>> Optional.empty
>>>>>>>>>>         */
>>>>>>>>>>        Optional<JobClient> getJobClient();
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> what do you think?
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Godfrey
>>>>>>>>>>
>>>>>>>>>> Timo Walther<[hidden email]>  于2020年3月26日周四 下午9:15写道:
>>>>>>>>>>
>>>>>>>>>>> Hi Godfrey,
>>>>>>>>>>>
>>>>>>>>>>> executing streaming queries must be our top priority because this
>>> is
>>>>>>>>>>> what distinguishes Flink from competitors. If we change the
>>>>>>>>>>> execution
>>>>>>>>>>> behavior, we should think about the other cases as well to not
>>> break
>>>>>>>> the
>>>>>>>>>>> API a third time.
>>>>>>>>>>>
>>>>>>>>>>> I fear that just having an async execute method will not be enough
>>>>>>>>>>> because users should be able to mix streaming and batch queries
>>> in a
>>>>>>>>>>> unified scenario.
>>>>>>>>>>>
>>>>>>>>>>> If I remember it correctly, we had some discussions in the past
>>>>>>>>>>> about
>>>>>>>>>>> what decides about the execution mode of a query. Currently, we
>>>>>>>>>>> would
>>>>>>>>>>> like to let the query decide, not derive it from the sources.
>>>>>>>>>>>
>>>>>>>>>>> So I could image a multiline pipeline as:
>>>>>>>>>>>
>>>>>>>>>>> USE CATALOG 'mycat';
>>>>>>>>>>> INSERT INTO t1 SELECT * FROM s;
>>>>>>>>>>> INSERT INTO t2 SELECT * FROM s JOIN t1 EMIT STREAM;
>>>>>>>>>>>
>>>>>>>>>>> For executeMultilineSql():
>>>>>>>>>>>
>>>>>>>>>>> sync because regular SQL
>>>>>>>>>>> sync because regular Batch SQL
>>>>>>>>>>> async because Streaming SQL
>>>>>>>>>>>
>>>>>>>>>>> For executeAsyncMultilineSql():
>>>>>>>>>>>
>>>>>>>>>>> async because everything should be async
>>>>>>>>>>> async because everything should be async
>>>>>>>>>>> async because everything should be async
>>>>>>>>>>>
>>>>>>>>>>> What we should not start for executeAsyncMultilineSql():
>>>>>>>>>>>
>>>>>>>>>>> sync because DDL
>>>>>>>>>>> async because everything should be async
>>>>>>>>>>> async because everything should be async
>>>>>>>>>>>
>>>>>>>>>>> What are you thoughts here?
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Timo
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 26.03.20 12:50, godfrey he wrote:
>>>>>>>>>>>> Hi Timo,
>>>>>>>>>>>>
>>>>>>>>>>>> I agree with you that streaming queries mostly need async
>>>>>>>>>>>> execution.
>>>>>>>>>>>> In fact, our original plan is only introducing sync methods in
>>> this
>>>>>>>>> FLIP,
>>>>>>>>>>>> and async methods (like "executeSqlAsync") will be introduced in
>>>>>>>>>>>> the
>>>>>>>>>>> future
>>>>>>>>>>>> which is mentioned in the appendix.
>>>>>>>>>>>>
>>>>>>>>>>>> Maybe the async methods also need to be considered in this FLIP.
>>>>>>>>>>>>
>>>>>>>>>>>> I think sync methods is also useful for streaming which can be
>>> used
>>>>>>>> to
>>>>>>>>>>> run
>>>>>>>>>>>> bounded source.
>>>>>>>>>>>> Maybe we should check whether all sources are bounded in sync
>>>>>>>> execution
>>>>>>>>>>>> mode.
>>>>>>>>>>>>
>>>>>>>>>>>>> Also, if we block for streaming queries, we could never support
>>>>>>>>>>>>> multiline files. Because the first INSERT INTO would block the
>>>>>>>> further
>>>>>>>>>>>>> execution.
>>>>>>>>>>>> agree with you, we need async method to submit multiline files,
>>>>>>>>>>>> and files should be limited that the DQL and DML should be
>>>>>>>>>>>> always in
>>>>>>>>> the
>>>>>>>>>>>> end for streaming.
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Godfrey
>>>>>>>>>>>>
>>>>>>>>>>>> Timo Walther<[hidden email]>  于2020年3月26日周四 下午4:29写道:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi Godfrey,
>>>>>>>>>>>>>
>>>>>>>>>>>>> having control over the job after submission is a requirement
>>> that
>>>>>>>> was
>>>>>>>>>>>>> requested frequently (some examples are [1], [2]). Users would
>>>>>>>>>>>>> like
>>>>>>>> to
>>>>>>>>>>>>> get insights about the running or completed job. Including the
>>>>>>>> jobId,
>>>>>>>>>>>>> jobGraph etc., the JobClient summarizes these properties.
>>>>>>>>>>>>>
>>>>>>>>>>>>> It is good to have a discussion about synchronous/asynchronous
>>>>>>>>>>>>> submission now to have a complete execution picture.
>>>>>>>>>>>>>
>>>>>>>>>>>>> I thought we submit streaming queries mostly async and just
>>>>>>>>>>>>> wait for
>>>>>>>>> the
>>>>>>>>>>>>> successful submission. If we block for streaming queries, how
>>>>>>>>>>>>> can we
>>>>>>>>>>>>> collect() or print() results?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Also, if we block for streaming queries, we could never support
>>>>>>>>>>>>> multiline files. Because the first INSERT INTO would block the
>>>>>>>> further
>>>>>>>>>>>>> execution.
>>>>>>>>>>>>>
>>>>>>>>>>>>> If we decide to block entirely on streaming queries, we need the
>>>>>>>> async
>>>>>>>>>>>>> execution methods in the design already. However, I would
>>>>>>>>>>>>> rather go
>>>>>>>>> for
>>>>>>>>>>>>> non-blocking streaming queries. Also with the `EMIT STREAM` key
>>>>>>>>>>>>> word
>>>>>>>>> in
>>>>>>>>>>>>> mind that we might add to SQL statements soon.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Regards,
>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]https://issues.apache.org/jira/browse/FLINK-16761
>>>>>>>>>>>>> [2]https://issues.apache.org/jira/browse/FLINK-12214
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 25.03.20 16:30, godfrey he wrote:
>>>>>>>>>>>>>> Hi Timo,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks for the updating.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regarding to "multiline statement support", I'm also fine that
>>>>>>>>>>>>>> `TableEnvironment.executeSql()` only supports single line
>>>>>>>> statement,
>>>>>>>>>>> and
>>>>>>>>>>>>> we
>>>>>>>>>>>>>> can support multiline statement later (needs more discussion
>>>>>>>>>>>>>> about
>>>>>>>>>>> this).
>>>>>>>>>>>>>> Regarding to "StatementSet.explian()", I don't have strong
>>>>>>>>>>>>>> opinions
>>>>>>>>>>> about
>>>>>>>>>>>>>> that.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regarding to "TableResult.getJobClient()", I think it's
>>>>>>>> unnecessary.
>>>>>>>>>>> The
>>>>>>>>>>>>>> reason is: first, many statements (e.g. DDL, show xx, use xx)
>>>>>>>>>>>>>> will
>>>>>>>>> not
>>>>>>>>>>>>>> submit a Flink job. second, `TableEnvironment.executeSql()` and
>>>>>>>>>>>>>> `StatementSet.execute()` are synchronous method, `TableResult`
>>>>>>>>>>>>>> will
>>>>>>>>> be
>>>>>>>>>>>>>> returned only after the job is finished or failed.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Regarding to "whether StatementSet.execute() needs to throw
>>>>>>>>>>> exception", I
>>>>>>>>>>>>>> think we should choose a unified way to tell whether the
>>>>>>>>>>>>>> execution
>>>>>>>> is
>>>>>>>>>>>>>> successful. If `TableResult` contains ERROR kind (non-runtime
>>>>>>>>>>> exception),
>>>>>>>>>>>>>> users need to not only check the result but also catch the
>>>>>>>>>>>>>> runtime
>>>>>>>>>>>>>> exception in their code. or `StatementSet.execute()` does not
>>>>>>>>>>>>>> throw
>>>>>>>>> any
>>>>>>>>>>>>>> exception (including runtime exception), all exception
>>>>>>>>>>>>>> messages are
>>>>>>>>> in
>>>>>>>>>>>>> the
>>>>>>>>>>>>>> result.  I prefer "StatementSet.execute() needs to throw
>>>>>>>> exception".
>>>>>>>>> cc
>>>>>>>>>>>>> @Jark
>>>>>>>>>>>>>> Wu<[hidden email]>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I will update the agreed parts to the document first.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>> Godfrey
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Timo Walther<[hidden email]>  于2020年3月25日周三
>>>>>>>>>>>>>> 下午6:51写道:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Hi Godfrey,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> thanks for starting the discussion on the mailing list. And
>>>>>>>>>>>>>>> sorry
>>>>>>>>>>> again
>>>>>>>>>>>>>>> for the late reply to FLIP-84. I have updated the Google doc
>>> one
>>>>>>>>> more
>>>>>>>>>>>>>>> time to incorporate the offline discussions.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>        From Dawid's and my view, it is fine to postpone the
>>>>>>>>>>>>>>> multiline
>>>>>>>>>>> support
>>>>>>>>>>>>>>> to a separate method. This can be future work even though we
>>>>>>>>>>>>>>> will
>>>>>>>>> need
>>>>>>>>>>>>>>> it rather soon.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> If there are no objections, I suggest to update the FLIP-84
>>>>>>>>>>>>>>> again
>>>>>>>>> and
>>>>>>>>>>>>>>> have another voting process.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>>> Timo
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On 25.03.20 11:17, godfrey he wrote:
>>>>>>>>>>>>>>>> Hi community,
>>>>>>>>>>>>>>>> Timo, Fabian and Dawid have some feedbacks about FLIP-84[1].
>>>>>>>>>>>>>>>> The
>>>>>>>>>>>>>>> feedbacks
>>>>>>>>>>>>>>>> are all about new introduced methods. We had a discussion
>>>>>>>>> yesterday,
>>>>>>>>>>>>> and
>>>>>>>>>>>>>>>> most of feedbacks have been agreed upon. Here is the
>>>>>>>>>>>>>>>> conclusions:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *1. about proposed methods in `TableEnvironment`:*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the original proposed methods:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> TableEnvironment.createDmlBatch(): DmlBatch
>>>>>>>>>>>>>>>> TableEnvironment.executeStatement(String statement):
>>>>>>>>>>>>>>>> ResultTable
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the new proposed methods:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> // we should not use abbreviations in the API, and the term
>>>>>>>> "Batch"
>>>>>>>>>>> is
>>>>>>>>>>>>>>>> easily confused with batch/streaming processing
>>>>>>>>>>>>>>>> TableEnvironment.createStatementSet(): StatementSet
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> // every method that takes SQL should have `Sql` in its name
>>>>>>>>>>>>>>>> // supports multiline statement ???
>>>>>>>>>>>>>>>> TableEnvironment.executeSql(String statement): TableResult
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> // new methods. supports explaining DQL and DML
>>>>>>>>>>>>>>>> TableEnvironment.explainSql(String statement,
>>> ExplainDetail...
>>>>>>>>>>>>> details):
>>>>>>>>>>>>>>>> String
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *2. about proposed related classes:*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the original proposed classes:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> interface DmlBatch {
>>>>>>>>>>>>>>>>            void addInsert(String insert);
>>>>>>>>>>>>>>>>            void addInsert(String targetPath, Table table);
>>>>>>>>>>>>>>>>            ResultTable execute() throws Exception ;
>>>>>>>>>>>>>>>>            String explain(boolean extended);
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> public interface ResultTable {
>>>>>>>>>>>>>>>>            TableSchema getResultSchema();
>>>>>>>>>>>>>>>>            Iterable<Row> getResultRows();
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> the new proposed classes:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> interface StatementSet {
>>>>>>>>>>>>>>>>            // every method that takes SQL should have `Sql` in
>>>>>>>>>>>>>>>> its
>>>>>>>>> name
>>>>>>>>>>>>>>>>            // return StatementSet instance for fluent
>>> programming
>>>>>>>>>>>>>>>>            addInsertSql(String statement): StatementSet
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>            // return StatementSet instance for fluent
>>> programming
>>>>>>>>>>>>>>>>            addInsert(String tablePath, Table table):
>>> StatementSet
>>>>>>>>>>>>>>>>            // new method. support overwrite mode
>>>>>>>>>>>>>>>>            addInsert(String tablePath, Table table, boolean
>>>>>>>>> overwrite):
>>>>>>>>>>>>>>>> StatementSet
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>            explain(): String
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>            // new method. supports adding more details for the
>>>>>>>> result
>>>>>>>>>>>>>>>>            explain(ExplainDetail... extraDetails): String
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>            // throw exception ???
>>>>>>>>>>>>>>>>            execute(): TableResult
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> interface TableResult {
>>>>>>>>>>>>>>>>            getTableSchema(): TableSchema
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>            // avoid custom parsing of an "OK" row in
>>> programming
>>>>>>>>>>>>>>>>            getResultKind(): ResultKind
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>            // instead of `get` make it explicit that this is
>>>>>>>>>>>>>>>> might
>>>>>>>> be
>>>>>>>>>>>>>>> triggering
>>>>>>>>>>>>>>>> an expensive operation
>>>>>>>>>>>>>>>>            collect(): Iterable<Row>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>            // for fluent programming
>>>>>>>>>>>>>>>>            print(): Unit
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> enum ResultKind {
>>>>>>>>>>>>>>>>            SUCCESS, // for DDL, DCL and statements with a
>>> simple
>>>>>>>> "OK"
>>>>>>>>>>>>>>>>            SUCCESS_WITH_CONTENT, // rows with important
>>>>>>>>>>>>>>>> content are
>>>>>>>>>>>>> available
>>>>>>>>>>>>>>>> (DML, DQL)
>>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> *3. new proposed methods in `Table`*
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> `Table.insertInto()` will be deprecated, and the following
>>>>>>>> methods
>>>>>>>>>>> are
>>>>>>>>>>>>>>>> introduced:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Table.executeInsert(String tablePath): TableResult
>>>>>>>>>>>>>>>> Table.executeInsert(String tablePath, boolean overwrite):
>>>>>>>>> TableResult
>>>>>>>>>>>>>>>> Table.explain(ExplainDetail... details): String
>>>>>>>>>>>>>>>> Table.execute(): TableResult
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> There are two issues need further discussion, one is whether
>>>>>>>>>>>>>>>> `TableEnvironment.executeSql(String statement): TableResult`
>>>>>>>> needs
>>>>>>>>> to
>>>>>>>>>>>>>>>> support multiline statement (or whether `TableEnvironment`
>>>>>>>>>>>>>>>> needs
>>>>>>>> to
>>>>>>>>>>>>>>> support
>>>>>>>>>>>>>>>> multiline statement), and another one is whether
>>>>>>>>>>>>> `StatementSet.execute()`
>>>>>>>>>>>>>>>> needs to throw exception.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> please refer to the feedback document [2] for the details.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Any suggestions are warmly welcomed!
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>>>>
>>> https://wiki.apache.org/confluence/pages/viewpage.action?pageId=134745878
>>>>>>>>>>>>>>>> [2]
>>>>>>>>>>>>>>>>
>>> https://docs.google.com/document/d/1ueLjQWRPdLTFB_TReAyhseAX-1N3j4WYWD0F02Uau0E/edit
>>>>>>>>>>>>>>>> Best,
>>>>>>>>>>>>>>>> Godfrey
>>>>>>>>>>>>>>>>

12