Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

dwysakowicz

Hi all,

@NiYanchun Thank you for reporting this. Yes I think we could improve the behaviour of the JSON format.

@Jark First of all I do agree we could/should improve the "user-friendliness" of the JSON format (and unify the behavior across text based formats). I am not sure though if it is as simple as just ignore the time zone here.

My suggestion would be rather to apply the logic of parsing a SQL timestamp literal (if the expected type is of LogicalTypeFamily.TIMESTAMP), which would actually also derive the "stored" type of the timestamp (either WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql conversion. Therefore if the

parsed type                 |        requested type            | behaviour

WITHOUT TIMEZONE    |     WITH TIMEZONE             | store the local timezone with the data

WITHOUT TIMEZONE    |     WITH LOCAL TIMEZONE  | do nothing in the data, interpret the time in local timezone

WITH TIMEZONE          |     WITH LOCAL TIMEZONE   | convert the timestamp to local timezone and drop the time zone information

WITH TIMEZONE          |     WITHOUT TIMEZONE       | drop the time zone information 

It might just boil down to what you said "being more lenient with regards to parsing the time zone". Nevertheless I think this way it is a bit better defined behaviour, especially as it has a defined behaviour when converting between representation with or without time zone.

An implementation note. I think we should aim to base the implementation on the DataTypes already rather than going back to the TypeInformation.

I would still try to leave the RFC 3339 compatibility mode, but maybe for that mode it would make sense to not support any types WITHOUT TIMEZONE? This would be enabled with a switch (disabled by default). As I understand the RFC, making the time zone mandatory is actually a big part of the standard as it makes time types unambiguous.

What do you think?

Ps. I cross posted this on the dev ML.

Best,

Dawid


On 26/02/2020 03:45, Jark Wu wrote:
Yes, I'm also in favor of loosen the datetime format constraint. 
I guess most of the users don't know there is a JSON standard which follows RFC 3339.

Best,
Jark

On Wed, 26 Feb 2020 at 10:06, NiYanchun <[hidden email]> wrote:

Yes, these Types definition are general. As a user/developer, I would support “loosen it for usability”. If not, may add some explanation about JSON.



 Original Message 
Sender: Jark Wu<[hidden email]>
Recipient: Outlook<[hidden email]>; Dawid Wysakowicz<[hidden email]>
Cc: godfrey he<[hidden email]>; Leonard Xu<[hidden email]>; user<[hidden email]>
Date: Wednesday, Feb 26, 2020 09:55
Subject: Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

Hi Outlook,

The explanation in DataTypes is correct, it is compliant to SQL standard. The problem is that JsonRowDeserializationSchema only support  RFC-3339. 
On the other hand, CsvRowDeserializationSchema supports to parse "2019-07-09 02:02:00.040".

So the question is shall we insist on the RFC-3339 "standard"? Shall we loosen it for usability? 
What do you think [hidden email] ?

Best,
Jark

On Wed, 26 Feb 2020 at 09:29, Outlook <[hidden email]> wrote:

Thanks Godfrey and Leonard, I tried your answers, result is OK. 


BTW, I think if only accept such format for a long time, the  TIME and TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` may be better to update,

because the document now is not what the method really support. For example, 


```

/**
* Data type of a time WITHOUT time zone {@code TIME} with no fractional seconds by default.
*
* <p>An instance consists of {@code hour:minute:second} with up to second precision
* and values ranging from {@code 00:00:00} to {@code 23:59:59}.
*
* <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) are not supported as the
* semantics are closer to {@link java.time.LocalTime}. A time WITH time zone is not provided.
*
* @see #TIME(int)
* @see TimeType
*/
public static DataType TIME() {
return new AtomicDataType(new TimeType());

}```


Thanks again.


 Original Message 
Sender: Leonard Xu<[hidden email]>
Recipient: godfrey he<[hidden email]>
Cc: Outlook<[hidden email]>; user<[hidden email]>
Date: Tuesday, Feb 25, 2020 22:56
Subject: Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

Hi,Outlook
Godfrey is right, you should follow the json format[1] when you parse your json message.
You can use following code to produce a json data-time String.
```
Long time = System.currentTimeMillis();
DateFormat dateFormat =  new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
Date date = new Date(time);
String jsonSchemaDate = dateFormat.format(date);
```

在 2020年2月25日,22:15,godfrey he <[hidden email]> 写道:

hi, I find that JsonRowDeserializationSchema only supports date-time with timezone according to RFC 3339. So you need add timezone to time data (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope it can help you.

Bests,
godfrey

Outlook <[hidden email]> 于2020年2月25日周二 下午5:49写道:
By the way, my flink version is 1.10.0.

 Original Message 
Sender: Outlook<[hidden email]>
Recipient: user<[hidden email]>
Date: Tuesday, Feb 25, 2020 17:43
Subject: TIME/TIMESTAMP parse in Flink TABLE/SQL API

Hi all,

I read json data from kafka, and print to console. When I do this, some error occurs when time/timestamp deserialization.

json data in Kafka:

```
{
"server_date": "2019-07-09",
"server_time": "14:02:00",
"reqsndtime_c": "2019-07-09 02:02:00.040"
}
```

flink code:

```
bsTableEnv.connect(
new Kafka()
.version("universal")
.topic("xxx")
.property("bootstrap.servers", "localhost:9092")
.property("zookeeper.connect", "localhost:2181")
.property("group.id", "g1")
.startFromEarliest()
).withFormat(
new Json()
.failOnMissingField(false)
).withSchema(
new Schema()
.field("server_date", DataTypes.DATE())
.field("server_time", DataTypes.TIME())
.field("reqsndtime_c", DataTypes.TIMESTAMP(3))
).inAppendMode()
.createTemporaryTable("xxx”);
```


server_date with format  is ok, but server_time with  DataTypes.DATE() and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error.  If I change them to DataTypes.STRING(), everything will be OK.

Error message:
```
Exception in thread "main" java.util.concurrent.ExecutionException: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
at org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
at cn.com.agree.Main.main(Main.java:122)
Caused by: org.apache.flink.client.program.ProgramInvocationException: Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
at akka.dispatch.OnComplete.internal(Future.scala:264)
at akka.dispatch.OnComplete.internal(Future.scala:261)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
... 31 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
... 4 more
Caused by: java.io.IOException: Failed to deserialize JSON object.
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
Caused by: java.time.format.DateTimeParseException: Text '14:02:00' could not be parsed at index 8
at java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
at org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
... 7 more

Process finished with exit code 1
```

reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception.  I see the doc,  DataTypes.TIME() value range is  from {@code 00:00:00} to {@code 23:59:59} , DataTypes.TIMESTAMP value range is from {@code 0000-01-01 00:00:00.000000000} to
* {@code 9999-12-31 23:59:59.999999999}.  And my value is in the range, I don’t know why.  And I see this may be bug in java 8, I change jdk to 11,   
error still occurs.

Can someone give me some help, thanks in advance.


signature.asc (849 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

Jark Wu-2
Hi Dawid,

I agree with you. If we want to loosen the format constraint, the
important piece is the conversion matrix.

The conversion matrix you listed makes sense to me. From my understanding,
there should be 6 combination.
We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH
TIMEZONE to make the matrix complete.
When the community reach an agreement on this, we should write it down on
the documentation and follow the matrix in all text-based formats.

Regarding to the RFC 3339 compatibility mode switch, it also sounds good to
me.

Best,
Jark

On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz <[hidden email]>
wrote:

> Hi all,
>
> @NiYanchun Thank you for reporting this. Yes I think we could improve the
> behaviour of the JSON format.
>
> @Jark First of all I do agree we could/should improve the
> "user-friendliness" of the JSON format (and unify the behavior across text
> based formats). I am not sure though if it is as simple as just ignore the
> time zone here.
>
> My suggestion would be rather to apply the logic of parsing a SQL
> timestamp literal (if the expected type is of LogicalTypeFamily.TIMESTAMP),
> which would actually also derive the "stored" type of the timestamp (either
> WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql conversion.
> Therefore if the
>
> parsed type                 |        requested type            | behaviour
>
> WITHOUT TIMEZONE    |     WITH TIMEZONE             | store the local
> timezone with the data
>
> WITHOUT TIMEZONE    |     WITH LOCAL TIMEZONE  | do nothing in the data,
> interpret the time in local timezone
>
> WITH TIMEZONE          |     WITH LOCAL TIMEZONE   | convert the timestamp
> to local timezone and drop the time zone information
>
> WITH TIMEZONE          |     WITHOUT TIMEZONE       | drop the time zone
> information
>
> It might just boil down to what you said "being more lenient with regards
> to parsing the time zone". Nevertheless I think this way it is a bit better
> defined behaviour, especially as it has a defined behaviour when converting
> between representation with or without time zone.
>
> An implementation note. I think we should aim to base the implementation
> on the DataTypes already rather than going back to the TypeInformation.
>
> I would still try to leave the RFC 3339 compatibility mode, but maybe for
> that mode it would make sense to not support any types WITHOUT TIMEZONE?
> This would be enabled with a switch (disabled by default). As I understand
> the RFC, making the time zone mandatory is actually a big part of the
> standard as it makes time types unambiguous.
>
> What do you think?
>
> Ps. I cross posted this on the dev ML.
>
> Best,
>
> Dawid
>
>
> On 26/02/2020 03:45, Jark Wu wrote:
>
> Yes, I'm also in favor of loosen the datetime format constraint.
> I guess most of the users don't know there is a JSON standard which
> follows RFC 3339.
>
> Best,
> Jark
>
> On Wed, 26 Feb 2020 at 10:06, NiYanchun <[hidden email]> wrote:
>
>> Yes, these Types definition are general. As a user/developer, I would
>> support “loosen it for usability”. If not, may add some explanation
>> about JSON.
>>
>>
>>
>>  Original Message
>> *Sender:* Jark Wu<[hidden email]>
>> *Recipient:* Outlook<[hidden email]>; Dawid Wysakowicz<
>> [hidden email]>
>> *Cc:* godfrey he<[hidden email]>; Leonard Xu<[hidden email]>;
>> user<[hidden email]>
>> *Date:* Wednesday, Feb 26, 2020 09:55
>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>
>> Hi Outlook,
>>
>> The explanation in DataTypes is correct, it is compliant to SQL standard.
>> The problem is that JsonRowDeserializationSchema only support  RFC-3339.
>> On the other hand, CsvRowDeserializationSchema supports to parse
>> "2019-07-09 02:02:00.040".
>>
>> So the question is shall we insist on the RFC-3339 "standard"? Shall we
>> loosen it for usability?
>> What do you think @Dawid Wysakowicz <[hidden email]> ?
>>
>> Best,
>> Jark
>>
>> On Wed, 26 Feb 2020 at 09:29, Outlook <[hidden email]> wrote:
>>
>>> Thanks Godfrey and Leonard, I tried your answers, result is OK.
>>>
>>>
>>> BTW, I think if only accept such format for a long time, the  TIME and
>>> TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` may be
>>> better to update,
>>>
>>> because the document now is not what the method really support. For
>>> example,
>>>
>>>
>>> ```
>>> /**
>>> * Data type of a time WITHOUT time zone {@code TIME} with no fractional
>>> seconds by default.
>>> *
>>> * <p>An instance consists of {@code hour:minute:second} with up to
>>> second precision
>>> * and values ranging from {@code 00:00:00} to {@code 23:59:59}.
>>> *
>>> * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61)
>>> are not supported as the
>>> * semantics are closer to {@link java.time.LocalTime}. A time WITH time
>>> zone is not provided.
>>> *
>>> * @see #TIME(int)
>>> * @see TimeType
>>> */
>>> public static DataType TIME() {
>>> return new AtomicDataType(new TimeType());
>>>
>>> }```
>>>
>>>
>>> Thanks again.
>>>
>>>  Original Message
>>> *Sender:* Leonard Xu<[hidden email]>
>>> *Recipient:* godfrey he<[hidden email]>
>>> *Cc:* Outlook<[hidden email]>; user<[hidden email]>
>>> *Date:* Tuesday, Feb 25, 2020 22:56
>>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>>
>>> Hi,Outlook
>>> Godfrey is right, you should follow the json format[1] when you parse
>>> your json message.
>>> You can use following code to produce a json data-time String.
>>> ```
>>>
>>> Long time = System.currentTimeMillis();DateFormat dateFormat =  new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");Date date = new Date(time);String jsonSchemaDate = dateFormat.format(date);
>>>
>>> ```
>>> [1]
>>> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
>>>
>>> 在 2020年2月25日,22:15,godfrey he <[hidden email]> 写道:
>>>
>>> hi, I find that JsonRowDeserializationSchema only supports date-time
>>> with timezone according to RFC 3339. So you need add timezone to time data
>>> (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope it can
>>> help you.
>>>
>>> Bests,
>>> godfrey
>>>
>>> Outlook <[hidden email]> 于2020年2月25日周二 下午5:49写道:
>>>
>>>> By the way, my flink version is 1.10.0.
>>>>
>>>>  Original Message
>>>> *Sender:* Outlook<[hidden email]>
>>>> *Recipient:* user<[hidden email]>
>>>> *Date:* Tuesday, Feb 25, 2020 17:43
>>>> *Subject:* TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>>>
>>>> Hi all,
>>>>
>>>> I read json data from kafka, and print to console. When I do this, some
>>>> error occurs when time/timestamp deserialization.
>>>>
>>>> json data in Kafka:
>>>>
>>>> ```
>>>> {
>>>> "server_date": "2019-07-09",
>>>> "server_time": "14:02:00",
>>>> "reqsndtime_c": "2019-07-09 02:02:00.040"
>>>> }
>>>> ```
>>>>
>>>> flink code:
>>>>
>>>> ```
>>>> bsTableEnv.connect(
>>>> new Kafka()
>>>> .version("universal")
>>>> .topic("xxx")
>>>> .property("bootstrap.servers", "localhost:9092")
>>>> .property("zookeeper.connect", "localhost:2181")
>>>> .property("group.id", "g1")
>>>> .startFromEarliest()
>>>> ).withFormat(
>>>> new Json()
>>>> .failOnMissingField(false)
>>>> ).withSchema(
>>>> new Schema()
>>>> .field("server_date", DataTypes.DATE())
>>>> .field("server_time", DataTypes.TIME())
>>>> .field("reqsndtime_c", DataTypes.TIMESTAMP(3))
>>>> ).inAppendMode()
>>>> .createTemporaryTable("xxx”);
>>>> ```
>>>>
>>>>
>>>> server_date with format  is ok, but server_time with  DataTypes.DATE()
>>>> and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error.  If I change them
>>>> to DataTypes.STRING(), everything will be OK.
>>>>
>>>> Error message:
>>>> ```
>>>> Exception in thread "main" java.util.concurrent.ExecutionException:
>>>> org.apache.flink.client.program.ProgramInvocationException: Job failed
>>>> (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
>>>> at
>>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>>> at
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
>>>> at
>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>>>> at
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>>> at
>>>> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
>>>> at
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
>>>> at cn.com.agree.Main.main(Main.java:122)
>>>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
>>>> Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
>>>> at
>>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>>> at
>>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>>> at
>>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>>> at
>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>> at
>>>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>>> at
>>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
>>>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>>>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> at
>>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>>>> at
>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>>> at
>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>>>> at
>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>>>> at
>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> at
>>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> at
>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>> at
>>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>>> at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>>>> execution failed.
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>>>> at
>>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>>>> ... 31 more
>>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
>>>> suppressed by NoRestartBackoffTimeStrategy
>>>> at
>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>>>> at
>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>>>> at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>>>> at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>>>> at
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>>>> at
>>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>>>> at
>>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> at
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> at
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>> at
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>> ... 4 more
>>>> Caused by: java.io.IOException: Failed to deserialize JSON object.
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
>>>> at
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>> at
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>> at
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>>>> Caused by: java.time.format.DateTimeParseException: *Text '14:02:00'
>>>> could not be parsed at index 8*
>>>> at
>>>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>>>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> at
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>>>> ... 7 more
>>>>
>>>> Process finished with exit code 1
>>>> ```
>>>>
>>>> reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception.  I see
>>>> the doc,  DataTypes.TIME() value range is  from {@code 00:00:00} to {@code
>>>> 23:59:59} , DataTypes.TIMESTAMP value range is from {@code 0000-01-01
>>>> 00:00:00.000000000} to
>>>> * {@code 9999-12-31 23:59:59.999999999}.  And my value is in the range,
>>>> I don’t know why.  And I see this may be bug in java 8, I change jdk to 11,
>>>>
>>>> error still occurs.
>>>>
>>>> Can someone give me some help, thanks in advance.
>>>>
>>>
>>>
Reply | Threaded
Open this post in threaded view
|

Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

Jingsong Li
Thanks all for your discussion.

Hi Dawid,

+1 to apply the logic of parsing a SQL timestamp literal.

I don't fully understand the matrix your list. Should this be the semantics
of SQL cast?
Do you mean this is implicit cast in JSON parser?
I doubt that because these implicit casts are not support
in LogicalTypeCasts. And it is not so good to understand when it occur
silently.

How about add "timestampFormat" property to JSON parser? Its default value
is SQL timestamp literal format. And user can configure this.

Best,
Jingsong Lee

On Wed, Feb 26, 2020 at 6:39 PM Jark Wu <[hidden email]> wrote:

> Hi Dawid,
>
> I agree with you. If we want to loosen the format constraint, the
> important piece is the conversion matrix.
>
> The conversion matrix you listed makes sense to me. From my understanding,
> there should be 6 combination.
> We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH
> TIMEZONE to make the matrix complete.
> When the community reach an agreement on this, we should write it down on
> the documentation and follow the matrix in all text-based formats.
>
> Regarding to the RFC 3339 compatibility mode switch, it also sounds good to
> me.
>
> Best,
> Jark
>
> On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz <[hidden email]>
> wrote:
>
> > Hi all,
> >
> > @NiYanchun Thank you for reporting this. Yes I think we could improve the
> > behaviour of the JSON format.
> >
> > @Jark First of all I do agree we could/should improve the
> > "user-friendliness" of the JSON format (and unify the behavior across
> text
> > based formats). I am not sure though if it is as simple as just ignore
> the
> > time zone here.
> >
> > My suggestion would be rather to apply the logic of parsing a SQL
> > timestamp literal (if the expected type is of
> LogicalTypeFamily.TIMESTAMP),
> > which would actually also derive the "stored" type of the timestamp
> (either
> > WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql
> conversion.
> > Therefore if the
> >
> > parsed type                 |        requested type            |
> behaviour
> >
> > WITHOUT TIMEZONE    |     WITH TIMEZONE             | store the local
> > timezone with the data
> >
> > WITHOUT TIMEZONE    |     WITH LOCAL TIMEZONE  | do nothing in the data,
> > interpret the time in local timezone
> >
> > WITH TIMEZONE          |     WITH LOCAL TIMEZONE   | convert the
> timestamp
> > to local timezone and drop the time zone information
> >
> > WITH TIMEZONE          |     WITHOUT TIMEZONE       | drop the time zone
> > information
> >
> > It might just boil down to what you said "being more lenient with regards
> > to parsing the time zone". Nevertheless I think this way it is a bit
> better
> > defined behaviour, especially as it has a defined behaviour when
> converting
> > between representation with or without time zone.
> >
> > An implementation note. I think we should aim to base the implementation
> > on the DataTypes already rather than going back to the TypeInformation.
> >
> > I would still try to leave the RFC 3339 compatibility mode, but maybe for
> > that mode it would make sense to not support any types WITHOUT TIMEZONE?
> > This would be enabled with a switch (disabled by default). As I
> understand
> > the RFC, making the time zone mandatory is actually a big part of the
> > standard as it makes time types unambiguous.
> >
> > What do you think?
> >
> > Ps. I cross posted this on the dev ML.
> >
> > Best,
> >
> > Dawid
> >
> >
> > On 26/02/2020 03:45, Jark Wu wrote:
> >
> > Yes, I'm also in favor of loosen the datetime format constraint.
> > I guess most of the users don't know there is a JSON standard which
> > follows RFC 3339.
> >
> > Best,
> > Jark
> >
> > On Wed, 26 Feb 2020 at 10:06, NiYanchun <[hidden email]> wrote:
> >
> >> Yes, these Types definition are general. As a user/developer, I would
> >> support “loosen it for usability”. If not, may add some explanation
> >> about JSON.
> >>
> >>
> >>
> >>  Original Message
> >> *Sender:* Jark Wu<[hidden email]>
> >> *Recipient:* Outlook<[hidden email]>; Dawid Wysakowicz<
> >> [hidden email]>
> >> *Cc:* godfrey he<[hidden email]>; Leonard Xu<[hidden email]>;
> >> user<[hidden email]>
> >> *Date:* Wednesday, Feb 26, 2020 09:55
> >> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
> >>
> >> Hi Outlook,
> >>
> >> The explanation in DataTypes is correct, it is compliant to SQL
> standard.
> >> The problem is that JsonRowDeserializationSchema only support  RFC-3339.
> >> On the other hand, CsvRowDeserializationSchema supports to parse
> >> "2019-07-09 02:02:00.040".
> >>
> >> So the question is shall we insist on the RFC-3339 "standard"? Shall we
> >> loosen it for usability?
> >> What do you think @Dawid Wysakowicz <[hidden email]> ?
> >>
> >> Best,
> >> Jark
> >>
> >> On Wed, 26 Feb 2020 at 09:29, Outlook <[hidden email]> wrote:
> >>
> >>> Thanks Godfrey and Leonard, I tried your answers, result is OK.
> >>>
> >>>
> >>> BTW, I think if only accept such format for a long time, the  TIME and
> >>> TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` may be
> >>> better to update,
> >>>
> >>> because the document now is not what the method really support. For
> >>> example,
> >>>
> >>>
> >>> ```
> >>> /**
> >>> * Data type of a time WITHOUT time zone {@code TIME} with no fractional
> >>> seconds by default.
> >>> *
> >>> * <p>An instance consists of {@code hour:minute:second} with up to
> >>> second precision
> >>> * and values ranging from {@code 00:00:00} to {@code 23:59:59}.
> >>> *
> >>> * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61)
> >>> are not supported as the
> >>> * semantics are closer to {@link java.time.LocalTime}. A time WITH time
> >>> zone is not provided.
> >>> *
> >>> * @see #TIME(int)
> >>> * @see TimeType
> >>> */
> >>> public static DataType TIME() {
> >>> return new AtomicDataType(new TimeType());
> >>>
> >>> }```
> >>>
> >>>
> >>> Thanks again.
> >>>
> >>>  Original Message
> >>> *Sender:* Leonard Xu<[hidden email]>
> >>> *Recipient:* godfrey he<[hidden email]>
> >>> *Cc:* Outlook<[hidden email]>; user<[hidden email]>
> >>> *Date:* Tuesday, Feb 25, 2020 22:56
> >>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
> >>>
> >>> Hi,Outlook
> >>> Godfrey is right, you should follow the json format[1] when you parse
> >>> your json message.
> >>> You can use following code to produce a json data-time String.
> >>> ```
> >>>
> >>> Long time = System.currentTimeMillis();DateFormat dateFormat =  new
> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");Date date = new
> Date(time);String jsonSchemaDate = dateFormat.format(date);
> >>>
> >>> ```
> >>> [1]
> >>>
> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
> >>>
> >>> 在 2020年2月25日,22:15,godfrey he <[hidden email]> 写道:
> >>>
> >>> hi, I find that JsonRowDeserializationSchema only supports date-time
> >>> with timezone according to RFC 3339. So you need add timezone to time
> data
> >>> (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope it
> can
> >>> help you.
> >>>
> >>> Bests,
> >>> godfrey
> >>>
> >>> Outlook <[hidden email]> 于2020年2月25日周二 下午5:49写道:
> >>>
> >>>> By the way, my flink version is 1.10.0.
> >>>>
> >>>>  Original Message
> >>>> *Sender:* Outlook<[hidden email]>
> >>>> *Recipient:* user<[hidden email]>
> >>>> *Date:* Tuesday, Feb 25, 2020 17:43
> >>>> *Subject:* TIME/TIMESTAMP parse in Flink TABLE/SQL API
> >>>>
> >>>> Hi all,
> >>>>
> >>>> I read json data from kafka, and print to console. When I do this,
> some
> >>>> error occurs when time/timestamp deserialization.
> >>>>
> >>>> json data in Kafka:
> >>>>
> >>>> ```
> >>>> {
> >>>> "server_date": "2019-07-09",
> >>>> "server_time": "14:02:00",
> >>>> "reqsndtime_c": "2019-07-09 02:02:00.040"
> >>>> }
> >>>> ```
> >>>>
> >>>> flink code:
> >>>>
> >>>> ```
> >>>> bsTableEnv.connect(
> >>>> new Kafka()
> >>>> .version("universal")
> >>>> .topic("xxx")
> >>>> .property("bootstrap.servers", "localhost:9092")
> >>>> .property("zookeeper.connect", "localhost:2181")
> >>>> .property("group.id", "g1")
> >>>> .startFromEarliest()
> >>>> ).withFormat(
> >>>> new Json()
> >>>> .failOnMissingField(false)
> >>>> ).withSchema(
> >>>> new Schema()
> >>>> .field("server_date", DataTypes.DATE())
> >>>> .field("server_time", DataTypes.TIME())
> >>>> .field("reqsndtime_c", DataTypes.TIMESTAMP(3))
> >>>> ).inAppendMode()
> >>>> .createTemporaryTable("xxx”);
> >>>> ```
> >>>>
> >>>>
> >>>> server_date with format  is ok, but server_time with  DataTypes.DATE()
> >>>> and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error.  If I
> change them
> >>>> to DataTypes.STRING(), everything will be OK.
> >>>>
> >>>> Error message:
> >>>> ```
> >>>> Exception in thread "main" java.util.concurrent.ExecutionException:
> >>>> org.apache.flink.client.program.ProgramInvocationException: Job failed
> >>>> (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
> >>>> at
> >>>>
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
> >>>> at
> >>>>
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
> >>>> at
> >>>>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
> >>>> at
> >>>>
> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
> >>>> at
> >>>>
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
> >>>> at
> >>>>
> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
> >>>> at
> >>>>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
> >>>> at cn.com.agree.Main.main(Main.java:122)
> >>>> Caused by: org.apache.flink.client.program.ProgramInvocationException:
> >>>> Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
> >>>> at
> >>>>
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
> >>>> at
> >>>>
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
> >>>> at
> >>>>
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
> >>>> at
> >>>>
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
> >>>> at
> >>>>
> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
> >>>> at
> >>>>
> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
> >>>> at akka.dispatch.OnComplete.internal(Future.scala:264)
> >>>> at akka.dispatch.OnComplete.internal(Future.scala:261)
> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> >>>> at
> >>>>
> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
> >>>> at
> >>>>
> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
> >>>> at
> >>>>
> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
> >>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
> >>>> at
> >>>>
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
> >>>> at
> >>>>
> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
> >>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
> >>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
> >>>> at
> >>>>
> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
> >>>> at
> >>>>
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
> >>>> at
> >>>>
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> >>>> at
> >>>>
> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
> >>>> at
> >>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
> >>>> at
> >>>>
> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
> >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
> >>>> at
> >>>>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
> >>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> >>>> at
> >>>>
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> >>>> at
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> >>>> at
> >>>>
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
> >>>> execution failed.
> >>>> at
> >>>>
> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
> >>>> at
> >>>>
> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
> >>>> ... 31 more
> >>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
> >>>> suppressed by NoRestartBackoffTimeStrategy
> >>>> at
> >>>>
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
> >>>> at
> >>>>
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
> >>>> at
> >>>>
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
> >>>> at
> >>>>
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
> >>>> at
> >>>>
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
> >>>> at
> >>>>
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
> >>>> at
> >>>>
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
> >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> >>>> at
> >>>>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> >>>> at
> >>>>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> >>>> at java.lang.reflect.Method.invoke(Method.java:498)
> >>>> at
> >>>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
> >>>> at
> >>>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
> >>>> at
> >>>>
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
> >>>> at
> >>>>
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> >>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
> >>>> at akka.japi.pf
> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
> >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
> >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
> >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
> >>>> ... 4 more
> >>>> Caused by: java.io.IOException: Failed to deserialize JSON object.
> >>>> at
> >>>>
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
> >>>> at
> >>>>
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
> >>>> at
> >>>>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
> >>>> at
> >>>>
> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
> >>>> at
> >>>>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
> >>>> at
> >>>>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
> >>>> at
> >>>>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
> >>>> at
> >>>>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
> >>>> Caused by: java.time.format.DateTimeParseException: *Text '14:02:00'
> >>>> could not be parsed at index 8*
> >>>> at
> >>>>
> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
> >>>> at
> java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
> >>>> at
> >>>>
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390)
> >>>> at
> >>>>
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> >>>> at
> >>>>
> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
> >>>> at
> >>>>
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
> >>>> at
> >>>>
> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
> >>>> at
> >>>>
> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
> >>>> ... 7 more
> >>>>
> >>>> Process finished with exit code 1
> >>>> ```
> >>>>
> >>>> reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception.  I see
> >>>> the doc,  DataTypes.TIME() value range is  from {@code 00:00:00} to
> {@code
> >>>> 23:59:59} , DataTypes.TIMESTAMP value range is from {@code 0000-01-01
> >>>> 00:00:00.000000000} to
> >>>> * {@code 9999-12-31 23:59:59.999999999}.  And my value is in the
> range,
> >>>> I don’t know why.  And I see this may be bug in java 8, I change jdk
> to 11,
> >>>>
> >>>> error still occurs.
> >>>>
> >>>> Can someone give me some help, thanks in advance.
> >>>>
> >>>
> >>>
>


--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

Jark Wu-2
Hi Jingsong,

I don't think it should follow SQL CAST semantics, because it is out of
SQL, it happens in connectors which converts users'/external's format into
SQL types.
I also doubt "timestampFormat" may not work in some cases, because the
timestamp format maybe various and mixed in a topic.

Best,
Jark

On Wed, 26 Feb 2020 at 22:20, Jingsong Li <[hidden email]> wrote:

> Thanks all for your discussion.
>
> Hi Dawid,
>
> +1 to apply the logic of parsing a SQL timestamp literal.
>
> I don't fully understand the matrix your list. Should this be the
> semantics of SQL cast?
> Do you mean this is implicit cast in JSON parser?
> I doubt that because these implicit casts are not support
> in LogicalTypeCasts. And it is not so good to understand when it occur
> silently.
>
> How about add "timestampFormat" property to JSON parser? Its default value
> is SQL timestamp literal format. And user can configure this.
>
> Best,
> Jingsong Lee
>
> On Wed, Feb 26, 2020 at 6:39 PM Jark Wu <[hidden email]> wrote:
>
>> Hi Dawid,
>>
>> I agree with you. If we want to loosen the format constraint, the
>> important piece is the conversion matrix.
>>
>> The conversion matrix you listed makes sense to me. From my understanding,
>> there should be 6 combination.
>> We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH
>> TIMEZONE to make the matrix complete.
>> When the community reach an agreement on this, we should write it down on
>> the documentation and follow the matrix in all text-based formats.
>>
>> Regarding to the RFC 3339 compatibility mode switch, it also sounds good
>> to
>> me.
>>
>> Best,
>> Jark
>>
>> On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz <[hidden email]>
>> wrote:
>>
>> > Hi all,
>> >
>> > @NiYanchun Thank you for reporting this. Yes I think we could improve
>> the
>> > behaviour of the JSON format.
>> >
>> > @Jark First of all I do agree we could/should improve the
>> > "user-friendliness" of the JSON format (and unify the behavior across
>> text
>> > based formats). I am not sure though if it is as simple as just ignore
>> the
>> > time zone here.
>> >
>> > My suggestion would be rather to apply the logic of parsing a SQL
>> > timestamp literal (if the expected type is of
>> LogicalTypeFamily.TIMESTAMP),
>> > which would actually also derive the "stored" type of the timestamp
>> (either
>> > WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql
>> conversion.
>> > Therefore if the
>> >
>> > parsed type                 |        requested type            |
>> behaviour
>> >
>> > WITHOUT TIMEZONE    |     WITH TIMEZONE             | store the local
>> > timezone with the data
>> >
>> > WITHOUT TIMEZONE    |     WITH LOCAL TIMEZONE  | do nothing in the data,
>> > interpret the time in local timezone
>> >
>> > WITH TIMEZONE          |     WITH LOCAL TIMEZONE   | convert the
>> timestamp
>> > to local timezone and drop the time zone information
>> >
>> > WITH TIMEZONE          |     WITHOUT TIMEZONE       | drop the time zone
>> > information
>> >
>> > It might just boil down to what you said "being more lenient with
>> regards
>> > to parsing the time zone". Nevertheless I think this way it is a bit
>> better
>> > defined behaviour, especially as it has a defined behaviour when
>> converting
>> > between representation with or without time zone.
>> >
>> > An implementation note. I think we should aim to base the implementation
>> > on the DataTypes already rather than going back to the TypeInformation.
>> >
>> > I would still try to leave the RFC 3339 compatibility mode, but maybe
>> for
>> > that mode it would make sense to not support any types WITHOUT TIMEZONE?
>> > This would be enabled with a switch (disabled by default). As I
>> understand
>> > the RFC, making the time zone mandatory is actually a big part of the
>> > standard as it makes time types unambiguous.
>> >
>> > What do you think?
>> >
>> > Ps. I cross posted this on the dev ML.
>> >
>> > Best,
>> >
>> > Dawid
>> >
>> >
>> > On 26/02/2020 03:45, Jark Wu wrote:
>> >
>> > Yes, I'm also in favor of loosen the datetime format constraint.
>> > I guess most of the users don't know there is a JSON standard which
>> > follows RFC 3339.
>> >
>> > Best,
>> > Jark
>> >
>> > On Wed, 26 Feb 2020 at 10:06, NiYanchun <[hidden email]> wrote:
>> >
>> >> Yes, these Types definition are general. As a user/developer, I would
>> >> support “loosen it for usability”. If not, may add some explanation
>> >> about JSON.
>> >>
>> >>
>> >>
>> >>  Original Message
>> >> *Sender:* Jark Wu<[hidden email]>
>> >> *Recipient:* Outlook<[hidden email]>; Dawid Wysakowicz<
>> >> [hidden email]>
>> >> *Cc:* godfrey he<[hidden email]>; Leonard Xu<[hidden email]>;
>> >> user<[hidden email]>
>> >> *Date:* Wednesday, Feb 26, 2020 09:55
>> >> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>> >>
>> >> Hi Outlook,
>> >>
>> >> The explanation in DataTypes is correct, it is compliant to SQL
>> standard.
>> >> The problem is that JsonRowDeserializationSchema only support
>> RFC-3339.
>> >> On the other hand, CsvRowDeserializationSchema supports to parse
>> >> "2019-07-09 02:02:00.040".
>> >>
>> >> So the question is shall we insist on the RFC-3339 "standard"? Shall we
>> >> loosen it for usability?
>> >> What do you think @Dawid Wysakowicz <[hidden email]> ?
>> >>
>> >> Best,
>> >> Jark
>> >>
>> >> On Wed, 26 Feb 2020 at 09:29, Outlook <[hidden email]> wrote:
>> >>
>> >>> Thanks Godfrey and Leonard, I tried your answers, result is OK.
>> >>>
>> >>>
>> >>> BTW, I think if only accept such format for a long time, the  TIME and
>> >>> TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` may
>> be
>> >>> better to update,
>> >>>
>> >>> because the document now is not what the method really support. For
>> >>> example,
>> >>>
>> >>>
>> >>> ```
>> >>> /**
>> >>> * Data type of a time WITHOUT time zone {@code TIME} with no
>> fractional
>> >>> seconds by default.
>> >>> *
>> >>> * <p>An instance consists of {@code hour:minute:second} with up to
>> >>> second precision
>> >>> * and values ranging from {@code 00:00:00} to {@code 23:59:59}.
>> >>> *
>> >>> * <p>Compared to the SQL standard, leap seconds (23:59:60 and
>> 23:59:61)
>> >>> are not supported as the
>> >>> * semantics are closer to {@link java.time.LocalTime}. A time WITH
>> time
>> >>> zone is not provided.
>> >>> *
>> >>> * @see #TIME(int)
>> >>> * @see TimeType
>> >>> */
>> >>> public static DataType TIME() {
>> >>> return new AtomicDataType(new TimeType());
>> >>>
>> >>> }```
>> >>>
>> >>>
>> >>> Thanks again.
>> >>>
>> >>>  Original Message
>> >>> *Sender:* Leonard Xu<[hidden email]>
>> >>> *Recipient:* godfrey he<[hidden email]>
>> >>> *Cc:* Outlook<[hidden email]>; user<[hidden email]>
>> >>> *Date:* Tuesday, Feb 25, 2020 22:56
>> >>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>> >>>
>> >>> Hi,Outlook
>> >>> Godfrey is right, you should follow the json format[1] when you parse
>> >>> your json message.
>> >>> You can use following code to produce a json data-time String.
>> >>> ```
>> >>>
>> >>> Long time = System.currentTimeMillis();DateFormat dateFormat =  new
>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");Date date = new
>> Date(time);String jsonSchemaDate = dateFormat.format(date);
>> >>>
>> >>> ```
>> >>> [1]
>> >>>
>> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
>> >>>
>> >>> 在 2020年2月25日,22:15,godfrey he <[hidden email]> 写道:
>> >>>
>> >>> hi, I find that JsonRowDeserializationSchema only supports date-time
>> >>> with timezone according to RFC 3339. So you need add timezone to time
>> data
>> >>> (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope
>> it can
>> >>> help you.
>> >>>
>> >>> Bests,
>> >>> godfrey
>> >>>
>> >>> Outlook <[hidden email]> 于2020年2月25日周二 下午5:49写道:
>> >>>
>> >>>> By the way, my flink version is 1.10.0.
>> >>>>
>> >>>>  Original Message
>> >>>> *Sender:* Outlook<[hidden email]>
>> >>>> *Recipient:* user<[hidden email]>
>> >>>> *Date:* Tuesday, Feb 25, 2020 17:43
>> >>>> *Subject:* TIME/TIMESTAMP parse in Flink TABLE/SQL API
>> >>>>
>> >>>> Hi all,
>> >>>>
>> >>>> I read json data from kafka, and print to console. When I do this,
>> some
>> >>>> error occurs when time/timestamp deserialization.
>> >>>>
>> >>>> json data in Kafka:
>> >>>>
>> >>>> ```
>> >>>> {
>> >>>> "server_date": "2019-07-09",
>> >>>> "server_time": "14:02:00",
>> >>>> "reqsndtime_c": "2019-07-09 02:02:00.040"
>> >>>> }
>> >>>> ```
>> >>>>
>> >>>> flink code:
>> >>>>
>> >>>> ```
>> >>>> bsTableEnv.connect(
>> >>>> new Kafka()
>> >>>> .version("universal")
>> >>>> .topic("xxx")
>> >>>> .property("bootstrap.servers", "localhost:9092")
>> >>>> .property("zookeeper.connect", "localhost:2181")
>> >>>> .property("group.id", "g1")
>> >>>> .startFromEarliest()
>> >>>> ).withFormat(
>> >>>> new Json()
>> >>>> .failOnMissingField(false)
>> >>>> ).withSchema(
>> >>>> new Schema()
>> >>>> .field("server_date", DataTypes.DATE())
>> >>>> .field("server_time", DataTypes.TIME())
>> >>>> .field("reqsndtime_c", DataTypes.TIMESTAMP(3))
>> >>>> ).inAppendMode()
>> >>>> .createTemporaryTable("xxx”);
>> >>>> ```
>> >>>>
>> >>>>
>> >>>> server_date with format  is ok, but server_time with
>> DataTypes.DATE()
>> >>>> and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error.  If I
>> change them
>> >>>> to DataTypes.STRING(), everything will be OK.
>> >>>>
>> >>>> Error message:
>> >>>> ```
>> >>>> Exception in thread "main" java.util.concurrent.ExecutionException:
>> >>>> org.apache.flink.client.program.ProgramInvocationException: Job
>> failed
>> >>>> (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
>> >>>> at
>> >>>>
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>> >>>> at
>> >>>>
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>> >>>> at
>> >>>>
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
>> >>>> at
>> >>>>
>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>> >>>> at
>> >>>>
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>> >>>> at
>> >>>>
>> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
>> >>>> at
>> >>>>
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
>> >>>> at cn.com.agree.Main.main(Main.java:122)
>> >>>> Caused by:
>> org.apache.flink.client.program.ProgramInvocationException:
>> >>>> Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
>> >>>> at
>> >>>>
>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>> >>>> at
>> >>>>
>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>> >>>> at
>> >>>>
>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>> >>>> at
>> >>>>
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>> >>>> at
>> >>>>
>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
>> >>>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>> >>>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>> >>>> at
>> >>>>
>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>> >>>> at
>> >>>>
>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>> >>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>> >>>> at
>> >>>>
>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>> >>>> at
>> >>>>
>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>> >>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>> >>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>> >>>> at
>> >>>>
>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>> >>>> at
>> >>>>
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>> >>>> at
>> >>>>
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> >>>> at
>> >>>>
>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>> >>>> at
>> >>>>
>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>> >>>> at
>> >>>>
>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>> >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>> >>>> at
>> >>>>
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>> >>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>> >>>> at
>> >>>>
>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>> >>>> at
>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>> >>>> at
>> >>>>
>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job
>> >>>> execution failed.
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>> >>>> at
>> >>>>
>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>> >>>> ... 31 more
>> >>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
>> >>>> suppressed by NoRestartBackoffTimeStrategy
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>> >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>>> at
>> >>>>
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>> >>>> at
>> >>>>
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>>> at java.lang.reflect.Method.invoke(Method.java:498)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>> >>>> at
>> >>>>
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>> >>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>> >>>> at akka.japi.pf
>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>> >>>> at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>> >>>> at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> >>>> at
>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>> >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>> >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>> >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>> >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>> >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>> >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>> >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>> >>>> ... 4 more
>> >>>> Caused by: java.io.IOException: Failed to deserialize JSON object.
>> >>>> at
>> >>>>
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
>> >>>> at
>> >>>>
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
>> >>>> at
>> >>>>
>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>> >>>> at
>> >>>>
>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
>> >>>> at
>> >>>>
>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>> >>>> at
>> >>>>
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>> >>>> at
>> >>>>
>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>> >>>> at
>> >>>>
>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>> >>>> Caused by: java.time.format.DateTimeParseException: *Text '14:02:00'
>> >>>> could not be parsed at index 8*
>> >>>> at
>> >>>>
>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>> >>>> at
>> java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>> >>>> at
>> >>>>
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390)
>> >>>> at
>> >>>>
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> >>>> at
>> >>>>
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>> >>>> at
>> >>>>
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>> >>>> at
>> >>>>
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>> >>>> at
>> >>>>
>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>> >>>> ... 7 more
>> >>>>
>> >>>> Process finished with exit code 1
>> >>>> ```
>> >>>>
>> >>>> reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception.  I
>> see
>> >>>> the doc,  DataTypes.TIME() value range is  from {@code 00:00:00} to
>> {@code
>> >>>> 23:59:59} , DataTypes.TIMESTAMP value range is from {@code 0000-01-01
>> >>>> 00:00:00.000000000} to
>> >>>> * {@code 9999-12-31 23:59:59.999999999}.  And my value is in the
>> range,
>> >>>> I don’t know why.  And I see this may be bug in java 8, I change jdk
>> to 11,
>> >>>>
>> >>>> error still occurs.
>> >>>>
>> >>>> Can someone give me some help, thanks in advance.
>> >>>>
>> >>>
>> >>>
>>
>
>
> --
> Best, Jingsong Lee
>
Reply | Threaded
Open this post in threaded view
|

Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

Jingsong Li
Hi Jark,

The matrix I see is SQL cast. If we need bring another conversion matrix
that is different from SQL cast, I don't understand the benefits. It makes
me difficult to understand.
And It seems bad to change the timestamp of different time zones to the
same value silently.

I have seen a lot of timestamp formats,  SQL, ISO, RFC. I can think that a
"timestampFormat" could help them to deal with various formats.
What way do you think can solve all the problems?

Best,
Jingsong Lee

On Wed, Feb 26, 2020 at 10:45 PM Jark Wu <[hidden email]> wrote:

> Hi Jingsong,
>
> I don't think it should follow SQL CAST semantics, because it is out of
> SQL, it happens in connectors which converts users'/external's format into
> SQL types.
> I also doubt "timestampFormat" may not work in some cases, because the
> timestamp format maybe various and mixed in a topic.
>
> Best,
> Jark
>
> On Wed, 26 Feb 2020 at 22:20, Jingsong Li <[hidden email]> wrote:
>
>> Thanks all for your discussion.
>>
>> Hi Dawid,
>>
>> +1 to apply the logic of parsing a SQL timestamp literal.
>>
>> I don't fully understand the matrix your list. Should this be the
>> semantics of SQL cast?
>> Do you mean this is implicit cast in JSON parser?
>> I doubt that because these implicit casts are not support
>> in LogicalTypeCasts. And it is not so good to understand when it occur
>> silently.
>>
>> How about add "timestampFormat" property to JSON parser? Its default
>> value is SQL timestamp literal format. And user can configure this.
>>
>> Best,
>> Jingsong Lee
>>
>> On Wed, Feb 26, 2020 at 6:39 PM Jark Wu <[hidden email]> wrote:
>>
>>> Hi Dawid,
>>>
>>> I agree with you. If we want to loosen the format constraint, the
>>> important piece is the conversion matrix.
>>>
>>> The conversion matrix you listed makes sense to me. From my
>>> understanding,
>>> there should be 6 combination.
>>> We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH
>>> TIMEZONE to make the matrix complete.
>>> When the community reach an agreement on this, we should write it down on
>>> the documentation and follow the matrix in all text-based formats.
>>>
>>> Regarding to the RFC 3339 compatibility mode switch, it also sounds good
>>> to
>>> me.
>>>
>>> Best,
>>> Jark
>>>
>>> On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz <[hidden email]>
>>> wrote:
>>>
>>> > Hi all,
>>> >
>>> > @NiYanchun Thank you for reporting this. Yes I think we could improve
>>> the
>>> > behaviour of the JSON format.
>>> >
>>> > @Jark First of all I do agree we could/should improve the
>>> > "user-friendliness" of the JSON format (and unify the behavior across
>>> text
>>> > based formats). I am not sure though if it is as simple as just ignore
>>> the
>>> > time zone here.
>>> >
>>> > My suggestion would be rather to apply the logic of parsing a SQL
>>> > timestamp literal (if the expected type is of
>>> LogicalTypeFamily.TIMESTAMP),
>>> > which would actually also derive the "stored" type of the timestamp
>>> (either
>>> > WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql
>>> conversion.
>>> > Therefore if the
>>> >
>>> > parsed type                 |        requested type            |
>>> behaviour
>>> >
>>> > WITHOUT TIMEZONE    |     WITH TIMEZONE             | store the local
>>> > timezone with the data
>>> >
>>> > WITHOUT TIMEZONE    |     WITH LOCAL TIMEZONE  | do nothing in the
>>> data,
>>> > interpret the time in local timezone
>>> >
>>> > WITH TIMEZONE          |     WITH LOCAL TIMEZONE   | convert the
>>> timestamp
>>> > to local timezone and drop the time zone information
>>> >
>>> > WITH TIMEZONE          |     WITHOUT TIMEZONE       | drop the time
>>> zone
>>> > information
>>> >
>>> > It might just boil down to what you said "being more lenient with
>>> regards
>>> > to parsing the time zone". Nevertheless I think this way it is a bit
>>> better
>>> > defined behaviour, especially as it has a defined behaviour when
>>> converting
>>> > between representation with or without time zone.
>>> >
>>> > An implementation note. I think we should aim to base the
>>> implementation
>>> > on the DataTypes already rather than going back to the TypeInformation.
>>> >
>>> > I would still try to leave the RFC 3339 compatibility mode, but maybe
>>> for
>>> > that mode it would make sense to not support any types WITHOUT
>>> TIMEZONE?
>>> > This would be enabled with a switch (disabled by default). As I
>>> understand
>>> > the RFC, making the time zone mandatory is actually a big part of the
>>> > standard as it makes time types unambiguous.
>>> >
>>> > What do you think?
>>> >
>>> > Ps. I cross posted this on the dev ML.
>>> >
>>> > Best,
>>> >
>>> > Dawid
>>> >
>>> >
>>> > On 26/02/2020 03:45, Jark Wu wrote:
>>> >
>>> > Yes, I'm also in favor of loosen the datetime format constraint.
>>> > I guess most of the users don't know there is a JSON standard which
>>> > follows RFC 3339.
>>> >
>>> > Best,
>>> > Jark
>>> >
>>> > On Wed, 26 Feb 2020 at 10:06, NiYanchun <[hidden email]> wrote:
>>> >
>>> >> Yes, these Types definition are general. As a user/developer, I would
>>> >> support “loosen it for usability”. If not, may add some explanation
>>> >> about JSON.
>>> >>
>>> >>
>>> >>
>>> >>  Original Message
>>> >> *Sender:* Jark Wu<[hidden email]>
>>> >> *Recipient:* Outlook<[hidden email]>; Dawid Wysakowicz<
>>> >> [hidden email]>
>>> >> *Cc:* godfrey he<[hidden email]>; Leonard Xu<[hidden email]>;
>>> >> user<[hidden email]>
>>> >> *Date:* Wednesday, Feb 26, 2020 09:55
>>> >> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>> >>
>>> >> Hi Outlook,
>>> >>
>>> >> The explanation in DataTypes is correct, it is compliant to SQL
>>> standard.
>>> >> The problem is that JsonRowDeserializationSchema only support
>>> RFC-3339.
>>> >> On the other hand, CsvRowDeserializationSchema supports to parse
>>> >> "2019-07-09 02:02:00.040".
>>> >>
>>> >> So the question is shall we insist on the RFC-3339 "standard"? Shall
>>> we
>>> >> loosen it for usability?
>>> >> What do you think @Dawid Wysakowicz <[hidden email]> ?
>>> >>
>>> >> Best,
>>> >> Jark
>>> >>
>>> >> On Wed, 26 Feb 2020 at 09:29, Outlook <[hidden email]> wrote:
>>> >>
>>> >>> Thanks Godfrey and Leonard, I tried your answers, result is OK.
>>> >>>
>>> >>>
>>> >>> BTW, I think if only accept such format for a long time, the  TIME
>>> and
>>> >>> TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` may
>>> be
>>> >>> better to update,
>>> >>>
>>> >>> because the document now is not what the method really support. For
>>> >>> example,
>>> >>>
>>> >>>
>>> >>> ```
>>> >>> /**
>>> >>> * Data type of a time WITHOUT time zone {@code TIME} with no
>>> fractional
>>> >>> seconds by default.
>>> >>> *
>>> >>> * <p>An instance consists of {@code hour:minute:second} with up to
>>> >>> second precision
>>> >>> * and values ranging from {@code 00:00:00} to {@code 23:59:59}.
>>> >>> *
>>> >>> * <p>Compared to the SQL standard, leap seconds (23:59:60 and
>>> 23:59:61)
>>> >>> are not supported as the
>>> >>> * semantics are closer to {@link java.time.LocalTime}. A time WITH
>>> time
>>> >>> zone is not provided.
>>> >>> *
>>> >>> * @see #TIME(int)
>>> >>> * @see TimeType
>>> >>> */
>>> >>> public static DataType TIME() {
>>> >>> return new AtomicDataType(new TimeType());
>>> >>>
>>> >>> }```
>>> >>>
>>> >>>
>>> >>> Thanks again.
>>> >>>
>>> >>>  Original Message
>>> >>> *Sender:* Leonard Xu<[hidden email]>
>>> >>> *Recipient:* godfrey he<[hidden email]>
>>> >>> *Cc:* Outlook<[hidden email]>; user<[hidden email]>
>>> >>> *Date:* Tuesday, Feb 25, 2020 22:56
>>> >>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>> >>>
>>> >>> Hi,Outlook
>>> >>> Godfrey is right, you should follow the json format[1] when you parse
>>> >>> your json message.
>>> >>> You can use following code to produce a json data-time String.
>>> >>> ```
>>> >>>
>>> >>> Long time = System.currentTimeMillis();DateFormat dateFormat =  new
>>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");Date date = new
>>> Date(time);String jsonSchemaDate = dateFormat.format(date);
>>> >>>
>>> >>> ```
>>> >>> [1]
>>> >>>
>>> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
>>> >>>
>>> >>> 在 2020年2月25日,22:15,godfrey he <[hidden email]> 写道:
>>> >>>
>>> >>> hi, I find that JsonRowDeserializationSchema only supports date-time
>>> >>> with timezone according to RFC 3339. So you need add timezone to
>>> time data
>>> >>> (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope
>>> it can
>>> >>> help you.
>>> >>>
>>> >>> Bests,
>>> >>> godfrey
>>> >>>
>>> >>> Outlook <[hidden email]> 于2020年2月25日周二 下午5:49写道:
>>> >>>
>>> >>>> By the way, my flink version is 1.10.0.
>>> >>>>
>>> >>>>  Original Message
>>> >>>> *Sender:* Outlook<[hidden email]>
>>> >>>> *Recipient:* user<[hidden email]>
>>> >>>> *Date:* Tuesday, Feb 25, 2020 17:43
>>> >>>> *Subject:* TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>> >>>>
>>> >>>> Hi all,
>>> >>>>
>>> >>>> I read json data from kafka, and print to console. When I do this,
>>> some
>>> >>>> error occurs when time/timestamp deserialization.
>>> >>>>
>>> >>>> json data in Kafka:
>>> >>>>
>>> >>>> ```
>>> >>>> {
>>> >>>> "server_date": "2019-07-09",
>>> >>>> "server_time": "14:02:00",
>>> >>>> "reqsndtime_c": "2019-07-09 02:02:00.040"
>>> >>>> }
>>> >>>> ```
>>> >>>>
>>> >>>> flink code:
>>> >>>>
>>> >>>> ```
>>> >>>> bsTableEnv.connect(
>>> >>>> new Kafka()
>>> >>>> .version("universal")
>>> >>>> .topic("xxx")
>>> >>>> .property("bootstrap.servers", "localhost:9092")
>>> >>>> .property("zookeeper.connect", "localhost:2181")
>>> >>>> .property("group.id", "g1")
>>> >>>> .startFromEarliest()
>>> >>>> ).withFormat(
>>> >>>> new Json()
>>> >>>> .failOnMissingField(false)
>>> >>>> ).withSchema(
>>> >>>> new Schema()
>>> >>>> .field("server_date", DataTypes.DATE())
>>> >>>> .field("server_time", DataTypes.TIME())
>>> >>>> .field("reqsndtime_c", DataTypes.TIMESTAMP(3))
>>> >>>> ).inAppendMode()
>>> >>>> .createTemporaryTable("xxx”);
>>> >>>> ```
>>> >>>>
>>> >>>>
>>> >>>> server_date with format  is ok, but server_time with
>>> DataTypes.DATE()
>>> >>>> and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error.  If I
>>> change them
>>> >>>> to DataTypes.STRING(), everything will be OK.
>>> >>>>
>>> >>>> Error message:
>>> >>>> ```
>>> >>>> Exception in thread "main" java.util.concurrent.ExecutionException:
>>> >>>> org.apache.flink.client.program.ProgramInvocationException: Job
>>> failed
>>> >>>> (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
>>> >>>> at
>>> >>>>
>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>> >>>> at
>>> >>>>
>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
>>> >>>> at cn.com.agree.Main.main(Main.java:122)
>>> >>>> Caused by:
>>> org.apache.flink.client.program.ProgramInvocationException:
>>> >>>> Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>> >>>> at
>>> >>>>
>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>> >>>> at
>>> >>>>
>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>> >>>> at
>>> >>>>
>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>> >>>> at
>>> >>>>
>>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
>>> >>>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>>> >>>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>>> >>>> at
>>> >>>>
>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>> >>>> at
>>> >>>>
>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>> >>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>>> >>>> at
>>> >>>>
>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>>> >>>> at
>>> >>>>
>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>>> >>>> at
>>> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>> >>>> at
>>> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>> >>>> at
>>> >>>>
>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>> >>>> at
>>> >>>>
>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>> >>>> at
>>> >>>>
>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>> >>>> at
>>> >>>>
>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>> >>>> at
>>> >>>>
>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>> >>>> at
>>> >>>>
>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>> >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>> >>>> at
>>> >>>>
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>> >>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>> >>>> at
>>> >>>>
>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>> >>>> at
>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>> >>>> at
>>> >>>>
>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>> >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>> Job
>>> >>>> execution failed.
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>>> >>>> ... 31 more
>>> >>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
>>> >>>> suppressed by NoRestartBackoffTimeStrategy
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>>> >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>> >>>> at
>>> >>>>
>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>> >>>> at
>>> >>>>
>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>> >>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>> >>>> at
>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>> >>>> at akka.japi.pf
>>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>> >>>> at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>> >>>> at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> >>>> at
>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>> >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>> >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>> >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>> >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>> >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>> >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>> >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>> >>>> ... 4 more
>>> >>>> Caused by: java.io.IOException: Failed to deserialize JSON object.
>>> >>>> at
>>> >>>>
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>>> >>>> Caused by: java.time.format.DateTimeParseException: *Text '14:02:00'
>>> >>>> could not be parsed at index 8*
>>> >>>> at
>>> >>>>
>>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>>> >>>> at
>>> java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>> >>>> at
>>> >>>>
>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>>> >>>> ... 7 more
>>> >>>>
>>> >>>> Process finished with exit code 1
>>> >>>> ```
>>> >>>>
>>> >>>> reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception.  I
>>> see
>>> >>>> the doc,  DataTypes.TIME() value range is  from {@code 00:00:00} to
>>> {@code
>>> >>>> 23:59:59} , DataTypes.TIMESTAMP value range is from {@code
>>> 0000-01-01
>>> >>>> 00:00:00.000000000} to
>>> >>>> * {@code 9999-12-31 23:59:59.999999999}.  And my value is in the
>>> range,
>>> >>>> I don’t know why.  And I see this may be bug in java 8, I change
>>> jdk to 11,
>>> >>>>
>>> >>>> error still occurs.
>>> >>>>
>>> >>>> Can someone give me some help, thanks in advance.
>>> >>>>
>>> >>>
>>> >>>
>>>
>>
>>
>> --
>> Best, Jingsong Lee
>>
>

--
Best, Jingsong Lee
Reply | Threaded
Open this post in threaded view
|

Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API

Jark Wu-2
Hi Jingsong, Dawid,

I created https://issues.apache.org/jira/browse/FLINK-16725 to track this
issue. We can continue discussion there.

Best,
Jark

On Thu, 27 Feb 2020 at 10:32, Jingsong Li <[hidden email]> wrote:

> Hi Jark,
>
> The matrix I see is SQL cast. If we need bring another conversion matrix
> that is different from SQL cast, I don't understand the benefits. It makes
> me difficult to understand.
> And It seems bad to change the timestamp of different time zones to the
> same value silently.
>
> I have seen a lot of timestamp formats,  SQL, ISO, RFC. I can think that a
> "timestampFormat" could help them to deal with various formats.
> What way do you think can solve all the problems?
>
> Best,
> Jingsong Lee
>
> On Wed, Feb 26, 2020 at 10:45 PM Jark Wu <[hidden email]> wrote:
>
>> Hi Jingsong,
>>
>> I don't think it should follow SQL CAST semantics, because it is out of
>> SQL, it happens in connectors which converts users'/external's format into
>> SQL types.
>> I also doubt "timestampFormat" may not work in some cases, because the
>> timestamp format maybe various and mixed in a topic.
>>
>> Best,
>> Jark
>>
>> On Wed, 26 Feb 2020 at 22:20, Jingsong Li <[hidden email]> wrote:
>>
>>> Thanks all for your discussion.
>>>
>>> Hi Dawid,
>>>
>>> +1 to apply the logic of parsing a SQL timestamp literal.
>>>
>>> I don't fully understand the matrix your list. Should this be the
>>> semantics of SQL cast?
>>> Do you mean this is implicit cast in JSON parser?
>>> I doubt that because these implicit casts are not support
>>> in LogicalTypeCasts. And it is not so good to understand when it occur
>>> silently.
>>>
>>> How about add "timestampFormat" property to JSON parser? Its default
>>> value is SQL timestamp literal format. And user can configure this.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>> On Wed, Feb 26, 2020 at 6:39 PM Jark Wu <[hidden email]> wrote:
>>>
>>>> Hi Dawid,
>>>>
>>>> I agree with you. If we want to loosen the format constraint, the
>>>> important piece is the conversion matrix.
>>>>
>>>> The conversion matrix you listed makes sense to me. From my
>>>> understanding,
>>>> there should be 6 combination.
>>>> We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE =>
>>>> WITH
>>>> TIMEZONE to make the matrix complete.
>>>> When the community reach an agreement on this, we should write it down
>>>> on
>>>> the documentation and follow the matrix in all text-based formats.
>>>>
>>>> Regarding to the RFC 3339 compatibility mode switch, it also sounds
>>>> good to
>>>> me.
>>>>
>>>> Best,
>>>> Jark
>>>>
>>>> On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz <[hidden email]>
>>>> wrote:
>>>>
>>>> > Hi all,
>>>> >
>>>> > @NiYanchun Thank you for reporting this. Yes I think we could improve
>>>> the
>>>> > behaviour of the JSON format.
>>>> >
>>>> > @Jark First of all I do agree we could/should improve the
>>>> > "user-friendliness" of the JSON format (and unify the behavior across
>>>> text
>>>> > based formats). I am not sure though if it is as simple as just
>>>> ignore the
>>>> > time zone here.
>>>> >
>>>> > My suggestion would be rather to apply the logic of parsing a SQL
>>>> > timestamp literal (if the expected type is of
>>>> LogicalTypeFamily.TIMESTAMP),
>>>> > which would actually also derive the "stored" type of the timestamp
>>>> (either
>>>> > WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql
>>>> conversion.
>>>> > Therefore if the
>>>> >
>>>> > parsed type                 |        requested type            |
>>>> behaviour
>>>> >
>>>> > WITHOUT TIMEZONE    |     WITH TIMEZONE             | store the local
>>>> > timezone with the data
>>>> >
>>>> > WITHOUT TIMEZONE    |     WITH LOCAL TIMEZONE  | do nothing in the
>>>> data,
>>>> > interpret the time in local timezone
>>>> >
>>>> > WITH TIMEZONE          |     WITH LOCAL TIMEZONE   | convert the
>>>> timestamp
>>>> > to local timezone and drop the time zone information
>>>> >
>>>> > WITH TIMEZONE          |     WITHOUT TIMEZONE       | drop the time
>>>> zone
>>>> > information
>>>> >
>>>> > It might just boil down to what you said "being more lenient with
>>>> regards
>>>> > to parsing the time zone". Nevertheless I think this way it is a bit
>>>> better
>>>> > defined behaviour, especially as it has a defined behaviour when
>>>> converting
>>>> > between representation with or without time zone.
>>>> >
>>>> > An implementation note. I think we should aim to base the
>>>> implementation
>>>> > on the DataTypes already rather than going back to the
>>>> TypeInformation.
>>>> >
>>>> > I would still try to leave the RFC 3339 compatibility mode, but maybe
>>>> for
>>>> > that mode it would make sense to not support any types WITHOUT
>>>> TIMEZONE?
>>>> > This would be enabled with a switch (disabled by default). As I
>>>> understand
>>>> > the RFC, making the time zone mandatory is actually a big part of the
>>>> > standard as it makes time types unambiguous.
>>>> >
>>>> > What do you think?
>>>> >
>>>> > Ps. I cross posted this on the dev ML.
>>>> >
>>>> > Best,
>>>> >
>>>> > Dawid
>>>> >
>>>> >
>>>> > On 26/02/2020 03:45, Jark Wu wrote:
>>>> >
>>>> > Yes, I'm also in favor of loosen the datetime format constraint.
>>>> > I guess most of the users don't know there is a JSON standard which
>>>> > follows RFC 3339.
>>>> >
>>>> > Best,
>>>> > Jark
>>>> >
>>>> > On Wed, 26 Feb 2020 at 10:06, NiYanchun <[hidden email]>
>>>> wrote:
>>>> >
>>>> >> Yes, these Types definition are general. As a user/developer, I would
>>>> >> support “loosen it for usability”. If not, may add some explanation
>>>> >> about JSON.
>>>> >>
>>>> >>
>>>> >>
>>>> >>  Original Message
>>>> >> *Sender:* Jark Wu<[hidden email]>
>>>> >> *Recipient:* Outlook<[hidden email]>; Dawid Wysakowicz<
>>>> >> [hidden email]>
>>>> >> *Cc:* godfrey he<[hidden email]>; Leonard Xu<[hidden email]
>>>> >;
>>>> >> user<[hidden email]>
>>>> >> *Date:* Wednesday, Feb 26, 2020 09:55
>>>> >> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>>> >>
>>>> >> Hi Outlook,
>>>> >>
>>>> >> The explanation in DataTypes is correct, it is compliant to SQL
>>>> standard.
>>>> >> The problem is that JsonRowDeserializationSchema only support
>>>> RFC-3339.
>>>> >> On the other hand, CsvRowDeserializationSchema supports to parse
>>>> >> "2019-07-09 02:02:00.040".
>>>> >>
>>>> >> So the question is shall we insist on the RFC-3339 "standard"? Shall
>>>> we
>>>> >> loosen it for usability?
>>>> >> What do you think @Dawid Wysakowicz <[hidden email]> ?
>>>> >>
>>>> >> Best,
>>>> >> Jark
>>>> >>
>>>> >> On Wed, 26 Feb 2020 at 09:29, Outlook <[hidden email]> wrote:
>>>> >>
>>>> >>> Thanks Godfrey and Leonard, I tried your answers, result is OK.
>>>> >>>
>>>> >>>
>>>> >>> BTW, I think if only accept such format for a long time, the  TIME
>>>> and
>>>> >>> TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes`
>>>> may be
>>>> >>> better to update,
>>>> >>>
>>>> >>> because the document now is not what the method really support. For
>>>> >>> example,
>>>> >>>
>>>> >>>
>>>> >>> ```
>>>> >>> /**
>>>> >>> * Data type of a time WITHOUT time zone {@code TIME} with no
>>>> fractional
>>>> >>> seconds by default.
>>>> >>> *
>>>> >>> * <p>An instance consists of {@code hour:minute:second} with up to
>>>> >>> second precision
>>>> >>> * and values ranging from {@code 00:00:00} to {@code 23:59:59}.
>>>> >>> *
>>>> >>> * <p>Compared to the SQL standard, leap seconds (23:59:60 and
>>>> 23:59:61)
>>>> >>> are not supported as the
>>>> >>> * semantics are closer to {@link java.time.LocalTime}. A time WITH
>>>> time
>>>> >>> zone is not provided.
>>>> >>> *
>>>> >>> * @see #TIME(int)
>>>> >>> * @see TimeType
>>>> >>> */
>>>> >>> public static DataType TIME() {
>>>> >>> return new AtomicDataType(new TimeType());
>>>> >>>
>>>> >>> }```
>>>> >>>
>>>> >>>
>>>> >>> Thanks again.
>>>> >>>
>>>> >>>  Original Message
>>>> >>> *Sender:* Leonard Xu<[hidden email]>
>>>> >>> *Recipient:* godfrey he<[hidden email]>
>>>> >>> *Cc:* Outlook<[hidden email]>; user<[hidden email]>
>>>> >>> *Date:* Tuesday, Feb 25, 2020 22:56
>>>> >>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>>> >>>
>>>> >>> Hi,Outlook
>>>> >>> Godfrey is right, you should follow the json format[1] when you
>>>> parse
>>>> >>> your json message.
>>>> >>> You can use following code to produce a json data-time String.
>>>> >>> ```
>>>> >>>
>>>> >>> Long time = System.currentTimeMillis();DateFormat dateFormat =  new
>>>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");Date date = new
>>>> Date(time);String jsonSchemaDate = dateFormat.format(date);
>>>> >>>
>>>> >>> ```
>>>> >>> [1]
>>>> >>>
>>>> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times
>>>> >>>
>>>> >>> 在 2020年2月25日,22:15,godfrey he <[hidden email]> 写道:
>>>> >>>
>>>> >>> hi, I find that JsonRowDeserializationSchema only supports date-time
>>>> >>> with timezone according to RFC 3339. So you need add timezone to
>>>> time data
>>>> >>> (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope
>>>> it can
>>>> >>> help you.
>>>> >>>
>>>> >>> Bests,
>>>> >>> godfrey
>>>> >>>
>>>> >>> Outlook <[hidden email]> 于2020年2月25日周二 下午5:49写道:
>>>> >>>
>>>> >>>> By the way, my flink version is 1.10.0.
>>>> >>>>
>>>> >>>>  Original Message
>>>> >>>> *Sender:* Outlook<[hidden email]>
>>>> >>>> *Recipient:* user<[hidden email]>
>>>> >>>> *Date:* Tuesday, Feb 25, 2020 17:43
>>>> >>>> *Subject:* TIME/TIMESTAMP parse in Flink TABLE/SQL API
>>>> >>>>
>>>> >>>> Hi all,
>>>> >>>>
>>>> >>>> I read json data from kafka, and print to console. When I do this,
>>>> some
>>>> >>>> error occurs when time/timestamp deserialization.
>>>> >>>>
>>>> >>>> json data in Kafka:
>>>> >>>>
>>>> >>>> ```
>>>> >>>> {
>>>> >>>> "server_date": "2019-07-09",
>>>> >>>> "server_time": "14:02:00",
>>>> >>>> "reqsndtime_c": "2019-07-09 02:02:00.040"
>>>> >>>> }
>>>> >>>> ```
>>>> >>>>
>>>> >>>> flink code:
>>>> >>>>
>>>> >>>> ```
>>>> >>>> bsTableEnv.connect(
>>>> >>>> new Kafka()
>>>> >>>> .version("universal")
>>>> >>>> .topic("xxx")
>>>> >>>> .property("bootstrap.servers", "localhost:9092")
>>>> >>>> .property("zookeeper.connect", "localhost:2181")
>>>> >>>> .property("group.id", "g1")
>>>> >>>> .startFromEarliest()
>>>> >>>> ).withFormat(
>>>> >>>> new Json()
>>>> >>>> .failOnMissingField(false)
>>>> >>>> ).withSchema(
>>>> >>>> new Schema()
>>>> >>>> .field("server_date", DataTypes.DATE())
>>>> >>>> .field("server_time", DataTypes.TIME())
>>>> >>>> .field("reqsndtime_c", DataTypes.TIMESTAMP(3))
>>>> >>>> ).inAppendMode()
>>>> >>>> .createTemporaryTable("xxx”);
>>>> >>>> ```
>>>> >>>>
>>>> >>>>
>>>> >>>> server_date with format  is ok, but server_time with
>>>> DataTypes.DATE()
>>>> >>>> and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error.  If I
>>>> change them
>>>> >>>> to DataTypes.STRING(), everything will be OK.
>>>> >>>>
>>>> >>>> Error message:
>>>> >>>> ```
>>>> >>>> Exception in thread "main" java.util.concurrent.ExecutionException:
>>>> >>>> org.apache.flink.client.program.ProgramInvocationException: Job
>>>> failed
>>>> >>>> (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
>>>> >>>> at
>>>> >>>>
>>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>>> >>>> at
>>>> >>>>
>>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643)
>>>> >>>> at cn.com.agree.Main.main(Main.java:122)
>>>> >>>> Caused by:
>>>> org.apache.flink.client.program.ProgramInvocationException:
>>>> >>>> Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112)
>>>> >>>> at
>>>> >>>>
>>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602)
>>>> >>>> at
>>>> >>>>
>>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577)
>>>> >>>> at
>>>> >>>>
>>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>>> >>>> at
>>>> >>>>
>>>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874)
>>>> >>>> at akka.dispatch.OnComplete.internal(Future.scala:264)
>>>> >>>> at akka.dispatch.OnComplete.internal(Future.scala:261)
>>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
>>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
>>>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74)
>>>> >>>> at
>>>> >>>>
>>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44)
>>>> >>>> at
>>>> >>>>
>>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252)
>>>> >>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572)
>>>> >>>> at
>>>> >>>>
>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
>>>> >>>> at
>>>> >>>>
>>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
>>>> >>>> at
>>>> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436)
>>>> >>>> at
>>>> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435)
>>>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36)
>>>> >>>> at
>>>> >>>>
>>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
>>>> >>>> at
>>>> >>>>
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91)
>>>> >>>> at
>>>> >>>>
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> >>>> at
>>>> >>>>
>>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91)
>>>> >>>> at
>>>> >>>>
>>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>> >>>> at
>>>> >>>>
>>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90)
>>>> >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
>>>> >>>> at
>>>> >>>>
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
>>>> >>>> at
>>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> >>>> at
>>>> >>>>
>>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> >>>> at
>>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> >>>> at
>>>> >>>>
>>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>> >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException:
>>>> Job
>>>> >>>> execution failed.
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110)
>>>> >>>> ... 31 more
>>>> >>>> Caused by: org.apache.flink.runtime.JobException: Recovery is
>>>> >>>> suppressed by NoRestartBackoffTimeStrategy
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
>>>> >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>>> >>>> at
>>>> >>>>
>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>>> >>>> at
>>>> >>>>
>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>>> >>>> at java.lang.reflect.Method.invoke(Method.java:498)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
>>>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
>>>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
>>>> >>>> at
>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>> >>>> at akka.japi.pf
>>>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
>>>> >>>> at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
>>>> >>>> at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> >>>> at
>>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
>>>> >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
>>>> >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
>>>> >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
>>>> >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
>>>> >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
>>>> >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225)
>>>> >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
>>>> >>>> ... 4 more
>>>> >>>> Caused by: java.io.IOException: Failed to deserialize JSON object.
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
>>>> >>>> Caused by: java.time.format.DateTimeParseException: *Text
>>>> '14:02:00'
>>>> >>>> could not be parsed at index 8*
>>>> >>>> at
>>>> >>>>
>>>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
>>>> >>>> at
>>>> java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236)
>>>> >>>> at
>>>> >>>>
>>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131)
>>>> >>>> ... 7 more
>>>> >>>>
>>>> >>>> Process finished with exit code 1
>>>> >>>> ```
>>>> >>>>
>>>> >>>> reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception.  I
>>>> see
>>>> >>>> the doc,  DataTypes.TIME() value range is  from {@code 00:00:00}
>>>> to {@code
>>>> >>>> 23:59:59} , DataTypes.TIMESTAMP value range is from {@code
>>>> 0000-01-01
>>>> >>>> 00:00:00.000000000} to
>>>> >>>> * {@code 9999-12-31 23:59:59.999999999}.  And my value is in the
>>>> range,
>>>> >>>> I don’t know why.  And I see this may be bug in java 8, I change
>>>> jdk to 11,
>>>> >>>>
>>>> >>>> error still occurs.
>>>> >>>>
>>>> >>>> Can someone give me some help, thanks in advance.
>>>> >>>>
>>>> >>>
>>>> >>>
>>>>
>>>
>>>
>>> --
>>> Best, Jingsong Lee
>>>
>>
>
> --
> Best, Jingsong Lee
>