Hi all, @NiYanchun Thank you for reporting this. Yes I think we could
improve the behaviour of the JSON format. @Jark First of all I do agree we could/should improve the
"user-friendliness" of the JSON format (and unify the behavior
across text based formats). I am not sure though if it is as
simple as just ignore the time zone here. My suggestion would be rather to apply the logic of parsing a SQL
timestamp literal (if the expected type is of
LogicalTypeFamily.TIMESTAMP), which would actually also derive the
"stored" type of the timestamp (either WITHOUT TIMEZONE or WITH
TIMEZONE) and then apply a proper sql conversion. Therefore if the
parsed type | requested type | behaviour WITHOUT TIMEZONE | WITH TIMEZONE | store the
local timezone with the data WITHOUT TIMEZONE | WITH LOCAL TIMEZONE | do nothing in
the data, interpret the time in local timezone WITH TIMEZONE | WITH LOCAL TIMEZONE | convert the timestamp to local timezone and drop the time zone information WITH TIMEZONE | WITHOUT TIMEZONE | drop the time zone information It might just boil down to what you said "being more lenient with regards to parsing the time zone". Nevertheless I think this way it is a bit better defined behaviour, especially as it has a defined behaviour when converting between representation with or without time zone. An implementation note. I think we should aim to base the
implementation on the DataTypes already rather than going back to
the TypeInformation. I would still try to leave the RFC 3339 compatibility mode, but
maybe for that mode it would make sense to not support any types
WITHOUT TIMEZONE? This would be enabled with a switch (disabled by
default). As I understand the RFC, making the time zone mandatory
is actually a big part of the standard as it makes time types
unambiguous. What do you think? Ps. I cross posted this on the dev ML. Best, Dawid
On 26/02/2020 03:45, Jark Wu wrote:
signature.asc (849 bytes) Download Attachment |
Hi Dawid,
I agree with you. If we want to loosen the format constraint, the important piece is the conversion matrix. The conversion matrix you listed makes sense to me. From my understanding, there should be 6 combination. We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH TIMEZONE to make the matrix complete. When the community reach an agreement on this, we should write it down on the documentation and follow the matrix in all text-based formats. Regarding to the RFC 3339 compatibility mode switch, it also sounds good to me. Best, Jark On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz <[hidden email]> wrote: > Hi all, > > @NiYanchun Thank you for reporting this. Yes I think we could improve the > behaviour of the JSON format. > > @Jark First of all I do agree we could/should improve the > "user-friendliness" of the JSON format (and unify the behavior across text > based formats). I am not sure though if it is as simple as just ignore the > time zone here. > > My suggestion would be rather to apply the logic of parsing a SQL > timestamp literal (if the expected type is of LogicalTypeFamily.TIMESTAMP), > which would actually also derive the "stored" type of the timestamp (either > WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql conversion. > Therefore if the > > parsed type | requested type | behaviour > > WITHOUT TIMEZONE | WITH TIMEZONE | store the local > timezone with the data > > WITHOUT TIMEZONE | WITH LOCAL TIMEZONE | do nothing in the data, > interpret the time in local timezone > > WITH TIMEZONE | WITH LOCAL TIMEZONE | convert the timestamp > to local timezone and drop the time zone information > > WITH TIMEZONE | WITHOUT TIMEZONE | drop the time zone > information > > It might just boil down to what you said "being more lenient with regards > to parsing the time zone". Nevertheless I think this way it is a bit better > defined behaviour, especially as it has a defined behaviour when converting > between representation with or without time zone. > > An implementation note. I think we should aim to base the implementation > on the DataTypes already rather than going back to the TypeInformation. > > I would still try to leave the RFC 3339 compatibility mode, but maybe for > that mode it would make sense to not support any types WITHOUT TIMEZONE? > This would be enabled with a switch (disabled by default). As I understand > the RFC, making the time zone mandatory is actually a big part of the > standard as it makes time types unambiguous. > > What do you think? > > Ps. I cross posted this on the dev ML. > > Best, > > Dawid > > > On 26/02/2020 03:45, Jark Wu wrote: > > Yes, I'm also in favor of loosen the datetime format constraint. > I guess most of the users don't know there is a JSON standard which > follows RFC 3339. > > Best, > Jark > > On Wed, 26 Feb 2020 at 10:06, NiYanchun <[hidden email]> wrote: > >> Yes, these Types definition are general. As a user/developer, I would >> support “loosen it for usability”. If not, may add some explanation >> about JSON. >> >> >> >> Original Message >> *Sender:* Jark Wu<[hidden email]> >> *Recipient:* Outlook<[hidden email]>; Dawid Wysakowicz< >> [hidden email]> >> *Cc:* godfrey he<[hidden email]>; Leonard Xu<[hidden email]>; >> user<[hidden email]> >> *Date:* Wednesday, Feb 26, 2020 09:55 >> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API >> >> Hi Outlook, >> >> The explanation in DataTypes is correct, it is compliant to SQL standard. >> The problem is that JsonRowDeserializationSchema only support RFC-3339. >> On the other hand, CsvRowDeserializationSchema supports to parse >> "2019-07-09 02:02:00.040". >> >> So the question is shall we insist on the RFC-3339 "standard"? Shall we >> loosen it for usability? >> What do you think @Dawid Wysakowicz <[hidden email]> ? >> >> Best, >> Jark >> >> On Wed, 26 Feb 2020 at 09:29, Outlook <[hidden email]> wrote: >> >>> Thanks Godfrey and Leonard, I tried your answers, result is OK. >>> >>> >>> BTW, I think if only accept such format for a long time, the TIME and >>> TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` may be >>> better to update, >>> >>> because the document now is not what the method really support. For >>> example, >>> >>> >>> ``` >>> /** >>> * Data type of a time WITHOUT time zone {@code TIME} with no fractional >>> seconds by default. >>> * >>> * <p>An instance consists of {@code hour:minute:second} with up to >>> second precision >>> * and values ranging from {@code 00:00:00} to {@code 23:59:59}. >>> * >>> * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) >>> are not supported as the >>> * semantics are closer to {@link java.time.LocalTime}. A time WITH time >>> zone is not provided. >>> * >>> * @see #TIME(int) >>> * @see TimeType >>> */ >>> public static DataType TIME() { >>> return new AtomicDataType(new TimeType()); >>> >>> }``` >>> >>> >>> Thanks again. >>> >>> Original Message >>> *Sender:* Leonard Xu<[hidden email]> >>> *Recipient:* godfrey he<[hidden email]> >>> *Cc:* Outlook<[hidden email]>; user<[hidden email]> >>> *Date:* Tuesday, Feb 25, 2020 22:56 >>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API >>> >>> Hi,Outlook >>> Godfrey is right, you should follow the json format[1] when you parse >>> your json message. >>> You can use following code to produce a json data-time String. >>> ``` >>> >>> Long time = System.currentTimeMillis();DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");Date date = new Date(time);String jsonSchemaDate = dateFormat.format(date); >>> >>> ``` >>> [1] >>> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times >>> >>> 在 2020年2月25日,22:15,godfrey he <[hidden email]> 写道: >>> >>> hi, I find that JsonRowDeserializationSchema only supports date-time >>> with timezone according to RFC 3339. So you need add timezone to time data >>> (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope it can >>> help you. >>> >>> Bests, >>> godfrey >>> >>> Outlook <[hidden email]> 于2020年2月25日周二 下午5:49写道: >>> >>>> By the way, my flink version is 1.10.0. >>>> >>>> Original Message >>>> *Sender:* Outlook<[hidden email]> >>>> *Recipient:* user<[hidden email]> >>>> *Date:* Tuesday, Feb 25, 2020 17:43 >>>> *Subject:* TIME/TIMESTAMP parse in Flink TABLE/SQL API >>>> >>>> Hi all, >>>> >>>> I read json data from kafka, and print to console. When I do this, some >>>> error occurs when time/timestamp deserialization. >>>> >>>> json data in Kafka: >>>> >>>> ``` >>>> { >>>> "server_date": "2019-07-09", >>>> "server_time": "14:02:00", >>>> "reqsndtime_c": "2019-07-09 02:02:00.040" >>>> } >>>> ``` >>>> >>>> flink code: >>>> >>>> ``` >>>> bsTableEnv.connect( >>>> new Kafka() >>>> .version("universal") >>>> .topic("xxx") >>>> .property("bootstrap.servers", "localhost:9092") >>>> .property("zookeeper.connect", "localhost:2181") >>>> .property("group.id", "g1") >>>> .startFromEarliest() >>>> ).withFormat( >>>> new Json() >>>> .failOnMissingField(false) >>>> ).withSchema( >>>> new Schema() >>>> .field("server_date", DataTypes.DATE()) >>>> .field("server_time", DataTypes.TIME()) >>>> .field("reqsndtime_c", DataTypes.TIMESTAMP(3)) >>>> ).inAppendMode() >>>> .createTemporaryTable("xxx”); >>>> ``` >>>> >>>> >>>> server_date with format is ok, but server_time with DataTypes.DATE() >>>> and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error. If I change them >>>> to DataTypes.STRING(), everything will be OK. >>>> >>>> Error message: >>>> ``` >>>> Exception in thread "main" java.util.concurrent.ExecutionException: >>>> org.apache.flink.client.program.ProgramInvocationException: Job failed >>>> (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) >>>> at >>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >>>> at >>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640) >>>> at >>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) >>>> at >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) >>>> at >>>> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42) >>>> at >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643) >>>> at cn.com.agree.Main.main(Main.java:122) >>>> Caused by: org.apache.flink.client.program.ProgramInvocationException: >>>> Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) >>>> at >>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) >>>> at >>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) >>>> at >>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) >>>> at >>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>> at >>>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) >>>> at >>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) >>>> at akka.dispatch.OnComplete.internal(Future.scala:264) >>>> at akka.dispatch.OnComplete.internal(Future.scala:261) >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>>> at >>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) >>>> at >>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) >>>> at >>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) >>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) >>>> at >>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) >>>> at >>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) >>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) >>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>>> at >>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >>>> at >>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >>>> at >>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>>> at >>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>>> at >>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >>>> at >>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >>>> execution failed. >>>> at >>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) >>>> at >>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) >>>> ... 31 more >>>> Caused by: org.apache.flink.runtime.JobException: Recovery is >>>> suppressed by NoRestartBackoffTimeStrategy >>>> at >>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) >>>> at >>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) >>>> at >>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) >>>> at >>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) >>>> at >>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) >>>> at >>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) >>>> at >>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> at >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> at >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) >>>> at >>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >>>> at >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>> ... 4 more >>>> Caused by: java.io.IOException: Failed to deserialize JSON object. >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) >>>> at >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>> at >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>>> at >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) >>>> Caused by: java.time.format.DateTimeParseException: *Text '14:02:00' >>>> could not be parsed at index 8* >>>> at >>>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) >>>> at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>>> at >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) >>>> ... 7 more >>>> >>>> Process finished with exit code 1 >>>> ``` >>>> >>>> reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception. I see >>>> the doc, DataTypes.TIME() value range is from {@code 00:00:00} to {@code >>>> 23:59:59} , DataTypes.TIMESTAMP value range is from {@code 0000-01-01 >>>> 00:00:00.000000000} to >>>> * {@code 9999-12-31 23:59:59.999999999}. And my value is in the range, >>>> I don’t know why. And I see this may be bug in java 8, I change jdk to 11, >>>> >>>> error still occurs. >>>> >>>> Can someone give me some help, thanks in advance. >>>> >>> >>> |
Thanks all for your discussion.
Hi Dawid, +1 to apply the logic of parsing a SQL timestamp literal. I don't fully understand the matrix your list. Should this be the semantics of SQL cast? Do you mean this is implicit cast in JSON parser? I doubt that because these implicit casts are not support in LogicalTypeCasts. And it is not so good to understand when it occur silently. How about add "timestampFormat" property to JSON parser? Its default value is SQL timestamp literal format. And user can configure this. Best, Jingsong Lee On Wed, Feb 26, 2020 at 6:39 PM Jark Wu <[hidden email]> wrote: > Hi Dawid, > > I agree with you. If we want to loosen the format constraint, the > important piece is the conversion matrix. > > The conversion matrix you listed makes sense to me. From my understanding, > there should be 6 combination. > We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH > TIMEZONE to make the matrix complete. > When the community reach an agreement on this, we should write it down on > the documentation and follow the matrix in all text-based formats. > > Regarding to the RFC 3339 compatibility mode switch, it also sounds good to > me. > > Best, > Jark > > On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz <[hidden email]> > wrote: > > > Hi all, > > > > @NiYanchun Thank you for reporting this. Yes I think we could improve the > > behaviour of the JSON format. > > > > @Jark First of all I do agree we could/should improve the > > "user-friendliness" of the JSON format (and unify the behavior across > text > > based formats). I am not sure though if it is as simple as just ignore > the > > time zone here. > > > > My suggestion would be rather to apply the logic of parsing a SQL > > timestamp literal (if the expected type is of > LogicalTypeFamily.TIMESTAMP), > > which would actually also derive the "stored" type of the timestamp > (either > > WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql > conversion. > > Therefore if the > > > > parsed type | requested type | > behaviour > > > > WITHOUT TIMEZONE | WITH TIMEZONE | store the local > > timezone with the data > > > > WITHOUT TIMEZONE | WITH LOCAL TIMEZONE | do nothing in the data, > > interpret the time in local timezone > > > > WITH TIMEZONE | WITH LOCAL TIMEZONE | convert the > timestamp > > to local timezone and drop the time zone information > > > > WITH TIMEZONE | WITHOUT TIMEZONE | drop the time zone > > information > > > > It might just boil down to what you said "being more lenient with regards > > to parsing the time zone". Nevertheless I think this way it is a bit > better > > defined behaviour, especially as it has a defined behaviour when > converting > > between representation with or without time zone. > > > > An implementation note. I think we should aim to base the implementation > > on the DataTypes already rather than going back to the TypeInformation. > > > > I would still try to leave the RFC 3339 compatibility mode, but maybe for > > that mode it would make sense to not support any types WITHOUT TIMEZONE? > > This would be enabled with a switch (disabled by default). As I > understand > > the RFC, making the time zone mandatory is actually a big part of the > > standard as it makes time types unambiguous. > > > > What do you think? > > > > Ps. I cross posted this on the dev ML. > > > > Best, > > > > Dawid > > > > > > On 26/02/2020 03:45, Jark Wu wrote: > > > > Yes, I'm also in favor of loosen the datetime format constraint. > > I guess most of the users don't know there is a JSON standard which > > follows RFC 3339. > > > > Best, > > Jark > > > > On Wed, 26 Feb 2020 at 10:06, NiYanchun <[hidden email]> wrote: > > > >> Yes, these Types definition are general. As a user/developer, I would > >> support “loosen it for usability”. If not, may add some explanation > >> about JSON. > >> > >> > >> > >> Original Message > >> *Sender:* Jark Wu<[hidden email]> > >> *Recipient:* Outlook<[hidden email]>; Dawid Wysakowicz< > >> [hidden email]> > >> *Cc:* godfrey he<[hidden email]>; Leonard Xu<[hidden email]>; > >> user<[hidden email]> > >> *Date:* Wednesday, Feb 26, 2020 09:55 > >> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API > >> > >> Hi Outlook, > >> > >> The explanation in DataTypes is correct, it is compliant to SQL > standard. > >> The problem is that JsonRowDeserializationSchema only support RFC-3339. > >> On the other hand, CsvRowDeserializationSchema supports to parse > >> "2019-07-09 02:02:00.040". > >> > >> So the question is shall we insist on the RFC-3339 "standard"? Shall we > >> loosen it for usability? > >> What do you think @Dawid Wysakowicz <[hidden email]> ? > >> > >> Best, > >> Jark > >> > >> On Wed, 26 Feb 2020 at 09:29, Outlook <[hidden email]> wrote: > >> > >>> Thanks Godfrey and Leonard, I tried your answers, result is OK. > >>> > >>> > >>> BTW, I think if only accept such format for a long time, the TIME and > >>> TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` may be > >>> better to update, > >>> > >>> because the document now is not what the method really support. For > >>> example, > >>> > >>> > >>> ``` > >>> /** > >>> * Data type of a time WITHOUT time zone {@code TIME} with no fractional > >>> seconds by default. > >>> * > >>> * <p>An instance consists of {@code hour:minute:second} with up to > >>> second precision > >>> * and values ranging from {@code 00:00:00} to {@code 23:59:59}. > >>> * > >>> * <p>Compared to the SQL standard, leap seconds (23:59:60 and 23:59:61) > >>> are not supported as the > >>> * semantics are closer to {@link java.time.LocalTime}. A time WITH time > >>> zone is not provided. > >>> * > >>> * @see #TIME(int) > >>> * @see TimeType > >>> */ > >>> public static DataType TIME() { > >>> return new AtomicDataType(new TimeType()); > >>> > >>> }``` > >>> > >>> > >>> Thanks again. > >>> > >>> Original Message > >>> *Sender:* Leonard Xu<[hidden email]> > >>> *Recipient:* godfrey he<[hidden email]> > >>> *Cc:* Outlook<[hidden email]>; user<[hidden email]> > >>> *Date:* Tuesday, Feb 25, 2020 22:56 > >>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API > >>> > >>> Hi,Outlook > >>> Godfrey is right, you should follow the json format[1] when you parse > >>> your json message. > >>> You can use following code to produce a json data-time String. > >>> ``` > >>> > >>> Long time = System.currentTimeMillis();DateFormat dateFormat = new > SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");Date date = new > Date(time);String jsonSchemaDate = dateFormat.format(date); > >>> > >>> ``` > >>> [1] > >>> > https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times > >>> > >>> 在 2020年2月25日,22:15,godfrey he <[hidden email]> 写道: > >>> > >>> hi, I find that JsonRowDeserializationSchema only supports date-time > >>> with timezone according to RFC 3339. So you need add timezone to time > data > >>> (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope it > can > >>> help you. > >>> > >>> Bests, > >>> godfrey > >>> > >>> Outlook <[hidden email]> 于2020年2月25日周二 下午5:49写道: > >>> > >>>> By the way, my flink version is 1.10.0. > >>>> > >>>> Original Message > >>>> *Sender:* Outlook<[hidden email]> > >>>> *Recipient:* user<[hidden email]> > >>>> *Date:* Tuesday, Feb 25, 2020 17:43 > >>>> *Subject:* TIME/TIMESTAMP parse in Flink TABLE/SQL API > >>>> > >>>> Hi all, > >>>> > >>>> I read json data from kafka, and print to console. When I do this, > some > >>>> error occurs when time/timestamp deserialization. > >>>> > >>>> json data in Kafka: > >>>> > >>>> ``` > >>>> { > >>>> "server_date": "2019-07-09", > >>>> "server_time": "14:02:00", > >>>> "reqsndtime_c": "2019-07-09 02:02:00.040" > >>>> } > >>>> ``` > >>>> > >>>> flink code: > >>>> > >>>> ``` > >>>> bsTableEnv.connect( > >>>> new Kafka() > >>>> .version("universal") > >>>> .topic("xxx") > >>>> .property("bootstrap.servers", "localhost:9092") > >>>> .property("zookeeper.connect", "localhost:2181") > >>>> .property("group.id", "g1") > >>>> .startFromEarliest() > >>>> ).withFormat( > >>>> new Json() > >>>> .failOnMissingField(false) > >>>> ).withSchema( > >>>> new Schema() > >>>> .field("server_date", DataTypes.DATE()) > >>>> .field("server_time", DataTypes.TIME()) > >>>> .field("reqsndtime_c", DataTypes.TIMESTAMP(3)) > >>>> ).inAppendMode() > >>>> .createTemporaryTable("xxx”); > >>>> ``` > >>>> > >>>> > >>>> server_date with format is ok, but server_time with DataTypes.DATE() > >>>> and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error. If I > change them > >>>> to DataTypes.STRING(), everything will be OK. > >>>> > >>>> Error message: > >>>> ``` > >>>> Exception in thread "main" java.util.concurrent.ExecutionException: > >>>> org.apache.flink.client.program.ProgramInvocationException: Job failed > >>>> (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) > >>>> at > >>>> > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > >>>> at > >>>> > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > >>>> at > >>>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640) > >>>> at > >>>> > org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) > >>>> at > >>>> > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) > >>>> at > >>>> > org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42) > >>>> at > >>>> > org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643) > >>>> at cn.com.agree.Main.main(Main.java:122) > >>>> Caused by: org.apache.flink.client.program.ProgramInvocationException: > >>>> Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) > >>>> at > >>>> > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) > >>>> at > >>>> > java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) > >>>> at > >>>> > java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) > >>>> at > >>>> > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > >>>> at > >>>> > java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) > >>>> at > >>>> > org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) > >>>> at akka.dispatch.OnComplete.internal(Future.scala:264) > >>>> at akka.dispatch.OnComplete.internal(Future.scala:261) > >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) > >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) > >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > >>>> at > >>>> > org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) > >>>> at > >>>> > scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) > >>>> at > >>>> > scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) > >>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) > >>>> at > >>>> > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) > >>>> at > >>>> > akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) > >>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) > >>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) > >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) > >>>> at > >>>> > akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) > >>>> at > >>>> > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) > >>>> at > >>>> > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > >>>> at > >>>> > akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) > >>>> at > >>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) > >>>> at > >>>> > akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) > >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) > >>>> at > >>>> > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) > >>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > >>>> at > >>>> > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > >>>> at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > >>>> at > >>>> > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job > >>>> execution failed. > >>>> at > >>>> > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) > >>>> at > >>>> > org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) > >>>> ... 31 more > >>>> Caused by: org.apache.flink.runtime.JobException: Recovery is > >>>> suppressed by NoRestartBackoffTimeStrategy > >>>> at > >>>> > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) > >>>> at > >>>> > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) > >>>> at > >>>> > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) > >>>> at > >>>> > org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) > >>>> at > >>>> > org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) > >>>> at > >>>> > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) > >>>> at > >>>> > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) > >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > >>>> at > >>>> > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > >>>> at > >>>> > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > >>>> at java.lang.reflect.Method.invoke(Method.java:498) > >>>> at > >>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) > >>>> at > >>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) > >>>> at > >>>> > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) > >>>> at > >>>> > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) > >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > >>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) > >>>> at akka.japi.pf > .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) > >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > >>>> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) > >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) > >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) > >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > >>>> ... 4 more > >>>> Caused by: java.io.IOException: Failed to deserialize JSON object. > >>>> at > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) > >>>> at > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) > >>>> at > >>>> > org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) > >>>> at > >>>> > org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) > >>>> at > >>>> > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) > >>>> at > >>>> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) > >>>> at > >>>> > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) > >>>> at > >>>> > org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) > >>>> Caused by: java.time.format.DateTimeParseException: *Text '14:02:00' > >>>> could not be parsed at index 8* > >>>> at > >>>> > java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) > >>>> at > java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) > >>>> at > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390) > >>>> at > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > >>>> at > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) > >>>> at > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) > >>>> at > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) > >>>> at > >>>> > org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) > >>>> ... 7 more > >>>> > >>>> Process finished with exit code 1 > >>>> ``` > >>>> > >>>> reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception. I see > >>>> the doc, DataTypes.TIME() value range is from {@code 00:00:00} to > {@code > >>>> 23:59:59} , DataTypes.TIMESTAMP value range is from {@code 0000-01-01 > >>>> 00:00:00.000000000} to > >>>> * {@code 9999-12-31 23:59:59.999999999}. And my value is in the > range, > >>>> I don’t know why. And I see this may be bug in java 8, I change jdk > to 11, > >>>> > >>>> error still occurs. > >>>> > >>>> Can someone give me some help, thanks in advance. > >>>> > >>> > >>> > -- Best, Jingsong Lee |
Hi Jingsong,
I don't think it should follow SQL CAST semantics, because it is out of SQL, it happens in connectors which converts users'/external's format into SQL types. I also doubt "timestampFormat" may not work in some cases, because the timestamp format maybe various and mixed in a topic. Best, Jark On Wed, 26 Feb 2020 at 22:20, Jingsong Li <[hidden email]> wrote: > Thanks all for your discussion. > > Hi Dawid, > > +1 to apply the logic of parsing a SQL timestamp literal. > > I don't fully understand the matrix your list. Should this be the > semantics of SQL cast? > Do you mean this is implicit cast in JSON parser? > I doubt that because these implicit casts are not support > in LogicalTypeCasts. And it is not so good to understand when it occur > silently. > > How about add "timestampFormat" property to JSON parser? Its default value > is SQL timestamp literal format. And user can configure this. > > Best, > Jingsong Lee > > On Wed, Feb 26, 2020 at 6:39 PM Jark Wu <[hidden email]> wrote: > >> Hi Dawid, >> >> I agree with you. If we want to loosen the format constraint, the >> important piece is the conversion matrix. >> >> The conversion matrix you listed makes sense to me. From my understanding, >> there should be 6 combination. >> We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH >> TIMEZONE to make the matrix complete. >> When the community reach an agreement on this, we should write it down on >> the documentation and follow the matrix in all text-based formats. >> >> Regarding to the RFC 3339 compatibility mode switch, it also sounds good >> to >> me. >> >> Best, >> Jark >> >> On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz <[hidden email]> >> wrote: >> >> > Hi all, >> > >> > @NiYanchun Thank you for reporting this. Yes I think we could improve >> the >> > behaviour of the JSON format. >> > >> > @Jark First of all I do agree we could/should improve the >> > "user-friendliness" of the JSON format (and unify the behavior across >> text >> > based formats). I am not sure though if it is as simple as just ignore >> the >> > time zone here. >> > >> > My suggestion would be rather to apply the logic of parsing a SQL >> > timestamp literal (if the expected type is of >> LogicalTypeFamily.TIMESTAMP), >> > which would actually also derive the "stored" type of the timestamp >> (either >> > WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql >> conversion. >> > Therefore if the >> > >> > parsed type | requested type | >> behaviour >> > >> > WITHOUT TIMEZONE | WITH TIMEZONE | store the local >> > timezone with the data >> > >> > WITHOUT TIMEZONE | WITH LOCAL TIMEZONE | do nothing in the data, >> > interpret the time in local timezone >> > >> > WITH TIMEZONE | WITH LOCAL TIMEZONE | convert the >> timestamp >> > to local timezone and drop the time zone information >> > >> > WITH TIMEZONE | WITHOUT TIMEZONE | drop the time zone >> > information >> > >> > It might just boil down to what you said "being more lenient with >> regards >> > to parsing the time zone". Nevertheless I think this way it is a bit >> better >> > defined behaviour, especially as it has a defined behaviour when >> converting >> > between representation with or without time zone. >> > >> > An implementation note. I think we should aim to base the implementation >> > on the DataTypes already rather than going back to the TypeInformation. >> > >> > I would still try to leave the RFC 3339 compatibility mode, but maybe >> for >> > that mode it would make sense to not support any types WITHOUT TIMEZONE? >> > This would be enabled with a switch (disabled by default). As I >> understand >> > the RFC, making the time zone mandatory is actually a big part of the >> > standard as it makes time types unambiguous. >> > >> > What do you think? >> > >> > Ps. I cross posted this on the dev ML. >> > >> > Best, >> > >> > Dawid >> > >> > >> > On 26/02/2020 03:45, Jark Wu wrote: >> > >> > Yes, I'm also in favor of loosen the datetime format constraint. >> > I guess most of the users don't know there is a JSON standard which >> > follows RFC 3339. >> > >> > Best, >> > Jark >> > >> > On Wed, 26 Feb 2020 at 10:06, NiYanchun <[hidden email]> wrote: >> > >> >> Yes, these Types definition are general. As a user/developer, I would >> >> support “loosen it for usability”. If not, may add some explanation >> >> about JSON. >> >> >> >> >> >> >> >> Original Message >> >> *Sender:* Jark Wu<[hidden email]> >> >> *Recipient:* Outlook<[hidden email]>; Dawid Wysakowicz< >> >> [hidden email]> >> >> *Cc:* godfrey he<[hidden email]>; Leonard Xu<[hidden email]>; >> >> user<[hidden email]> >> >> *Date:* Wednesday, Feb 26, 2020 09:55 >> >> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API >> >> >> >> Hi Outlook, >> >> >> >> The explanation in DataTypes is correct, it is compliant to SQL >> standard. >> >> The problem is that JsonRowDeserializationSchema only support >> RFC-3339. >> >> On the other hand, CsvRowDeserializationSchema supports to parse >> >> "2019-07-09 02:02:00.040". >> >> >> >> So the question is shall we insist on the RFC-3339 "standard"? Shall we >> >> loosen it for usability? >> >> What do you think @Dawid Wysakowicz <[hidden email]> ? >> >> >> >> Best, >> >> Jark >> >> >> >> On Wed, 26 Feb 2020 at 09:29, Outlook <[hidden email]> wrote: >> >> >> >>> Thanks Godfrey and Leonard, I tried your answers, result is OK. >> >>> >> >>> >> >>> BTW, I think if only accept such format for a long time, the TIME and >> >>> TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` may >> be >> >>> better to update, >> >>> >> >>> because the document now is not what the method really support. For >> >>> example, >> >>> >> >>> >> >>> ``` >> >>> /** >> >>> * Data type of a time WITHOUT time zone {@code TIME} with no >> fractional >> >>> seconds by default. >> >>> * >> >>> * <p>An instance consists of {@code hour:minute:second} with up to >> >>> second precision >> >>> * and values ranging from {@code 00:00:00} to {@code 23:59:59}. >> >>> * >> >>> * <p>Compared to the SQL standard, leap seconds (23:59:60 and >> 23:59:61) >> >>> are not supported as the >> >>> * semantics are closer to {@link java.time.LocalTime}. A time WITH >> time >> >>> zone is not provided. >> >>> * >> >>> * @see #TIME(int) >> >>> * @see TimeType >> >>> */ >> >>> public static DataType TIME() { >> >>> return new AtomicDataType(new TimeType()); >> >>> >> >>> }``` >> >>> >> >>> >> >>> Thanks again. >> >>> >> >>> Original Message >> >>> *Sender:* Leonard Xu<[hidden email]> >> >>> *Recipient:* godfrey he<[hidden email]> >> >>> *Cc:* Outlook<[hidden email]>; user<[hidden email]> >> >>> *Date:* Tuesday, Feb 25, 2020 22:56 >> >>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API >> >>> >> >>> Hi,Outlook >> >>> Godfrey is right, you should follow the json format[1] when you parse >> >>> your json message. >> >>> You can use following code to produce a json data-time String. >> >>> ``` >> >>> >> >>> Long time = System.currentTimeMillis();DateFormat dateFormat = new >> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");Date date = new >> Date(time);String jsonSchemaDate = dateFormat.format(date); >> >>> >> >>> ``` >> >>> [1] >> >>> >> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times >> >>> >> >>> 在 2020年2月25日,22:15,godfrey he <[hidden email]> 写道: >> >>> >> >>> hi, I find that JsonRowDeserializationSchema only supports date-time >> >>> with timezone according to RFC 3339. So you need add timezone to time >> data >> >>> (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope >> it can >> >>> help you. >> >>> >> >>> Bests, >> >>> godfrey >> >>> >> >>> Outlook <[hidden email]> 于2020年2月25日周二 下午5:49写道: >> >>> >> >>>> By the way, my flink version is 1.10.0. >> >>>> >> >>>> Original Message >> >>>> *Sender:* Outlook<[hidden email]> >> >>>> *Recipient:* user<[hidden email]> >> >>>> *Date:* Tuesday, Feb 25, 2020 17:43 >> >>>> *Subject:* TIME/TIMESTAMP parse in Flink TABLE/SQL API >> >>>> >> >>>> Hi all, >> >>>> >> >>>> I read json data from kafka, and print to console. When I do this, >> some >> >>>> error occurs when time/timestamp deserialization. >> >>>> >> >>>> json data in Kafka: >> >>>> >> >>>> ``` >> >>>> { >> >>>> "server_date": "2019-07-09", >> >>>> "server_time": "14:02:00", >> >>>> "reqsndtime_c": "2019-07-09 02:02:00.040" >> >>>> } >> >>>> ``` >> >>>> >> >>>> flink code: >> >>>> >> >>>> ``` >> >>>> bsTableEnv.connect( >> >>>> new Kafka() >> >>>> .version("universal") >> >>>> .topic("xxx") >> >>>> .property("bootstrap.servers", "localhost:9092") >> >>>> .property("zookeeper.connect", "localhost:2181") >> >>>> .property("group.id", "g1") >> >>>> .startFromEarliest() >> >>>> ).withFormat( >> >>>> new Json() >> >>>> .failOnMissingField(false) >> >>>> ).withSchema( >> >>>> new Schema() >> >>>> .field("server_date", DataTypes.DATE()) >> >>>> .field("server_time", DataTypes.TIME()) >> >>>> .field("reqsndtime_c", DataTypes.TIMESTAMP(3)) >> >>>> ).inAppendMode() >> >>>> .createTemporaryTable("xxx”); >> >>>> ``` >> >>>> >> >>>> >> >>>> server_date with format is ok, but server_time with >> DataTypes.DATE() >> >>>> and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error. If I >> change them >> >>>> to DataTypes.STRING(), everything will be OK. >> >>>> >> >>>> Error message: >> >>>> ``` >> >>>> Exception in thread "main" java.util.concurrent.ExecutionException: >> >>>> org.apache.flink.client.program.ProgramInvocationException: Job >> failed >> >>>> (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) >> >>>> at >> >>>> >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> >>>> at >> >>>> >> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) >> >>>> at >> >>>> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640) >> >>>> at >> >>>> >> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) >> >>>> at >> >>>> >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) >> >>>> at >> >>>> >> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42) >> >>>> at >> >>>> >> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643) >> >>>> at cn.com.agree.Main.main(Main.java:122) >> >>>> Caused by: >> org.apache.flink.client.program.ProgramInvocationException: >> >>>> Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) >> >>>> at >> >>>> >> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) >> >>>> at >> >>>> >> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) >> >>>> at >> >>>> >> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) >> >>>> at >> >>>> >> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >> >>>> at >> >>>> >> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) >> >>>> at >> >>>> >> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) >> >>>> at akka.dispatch.OnComplete.internal(Future.scala:264) >> >>>> at akka.dispatch.OnComplete.internal(Future.scala:261) >> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) >> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) >> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >> >>>> at >> >>>> >> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) >> >>>> at >> >>>> >> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) >> >>>> at >> >>>> >> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) >> >>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) >> >>>> at >> >>>> >> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) >> >>>> at >> >>>> >> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) >> >>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) >> >>>> at scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) >> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >> >>>> at >> >>>> >> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >> >>>> at >> >>>> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >> >>>> at >> >>>> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >> >>>> at >> >>>> >> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >> >>>> at >> >>>> >> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >> >>>> at >> >>>> >> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >> >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >> >>>> at >> >>>> >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >> >>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> >>>> at >> >>>> >> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> >>>> at >> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> >>>> at >> >>>> >> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: Job >> >>>> execution failed. >> >>>> at >> >>>> >> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) >> >>>> at >> >>>> >> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) >> >>>> ... 31 more >> >>>> Caused by: org.apache.flink.runtime.JobException: Recovery is >> >>>> suppressed by NoRestartBackoffTimeStrategy >> >>>> at >> >>>> >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) >> >>>> at >> >>>> >> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) >> >>>> at >> >>>> >> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) >> >>>> at >> >>>> >> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) >> >>>> at >> >>>> >> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) >> >>>> at >> >>>> >> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) >> >>>> at >> >>>> >> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) >> >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >>>> at >> >>>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >> >>>> at >> >>>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >>>> at java.lang.reflect.Method.invoke(Method.java:498) >> >>>> at >> >>>> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) >> >>>> at >> >>>> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) >> >>>> at >> >>>> >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >> >>>> at >> >>>> >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >> >>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >> >>>> at akka.japi.pf >> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >> >>>> at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >> >>>> at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> >>>> at >> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >> >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >> >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >> >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >> >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >> >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >> >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >> >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >> >>>> ... 4 more >> >>>> Caused by: java.io.IOException: Failed to deserialize JSON object. >> >>>> at >> >>>> >> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) >> >>>> at >> >>>> >> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) >> >>>> at >> >>>> >> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) >> >>>> at >> >>>> >> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) >> >>>> at >> >>>> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) >> >>>> at >> >>>> >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >> >>>> at >> >>>> >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >> >>>> at >> >>>> >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) >> >>>> Caused by: java.time.format.DateTimeParseException: *Text '14:02:00' >> >>>> could not be parsed at index 8* >> >>>> at >> >>>> >> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) >> >>>> at >> java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) >> >>>> at >> >>>> >> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390) >> >>>> at >> >>>> >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >> >>>> at >> >>>> >> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >> >>>> at >> >>>> >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >> >>>> at >> >>>> >> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >> >>>> at >> >>>> >> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) >> >>>> ... 7 more >> >>>> >> >>>> Process finished with exit code 1 >> >>>> ``` >> >>>> >> >>>> reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception. I >> see >> >>>> the doc, DataTypes.TIME() value range is from {@code 00:00:00} to >> {@code >> >>>> 23:59:59} , DataTypes.TIMESTAMP value range is from {@code 0000-01-01 >> >>>> 00:00:00.000000000} to >> >>>> * {@code 9999-12-31 23:59:59.999999999}. And my value is in the >> range, >> >>>> I don’t know why. And I see this may be bug in java 8, I change jdk >> to 11, >> >>>> >> >>>> error still occurs. >> >>>> >> >>>> Can someone give me some help, thanks in advance. >> >>>> >> >>> >> >>> >> > > > -- > Best, Jingsong Lee > |
Hi Jark,
The matrix I see is SQL cast. If we need bring another conversion matrix that is different from SQL cast, I don't understand the benefits. It makes me difficult to understand. And It seems bad to change the timestamp of different time zones to the same value silently. I have seen a lot of timestamp formats, SQL, ISO, RFC. I can think that a "timestampFormat" could help them to deal with various formats. What way do you think can solve all the problems? Best, Jingsong Lee On Wed, Feb 26, 2020 at 10:45 PM Jark Wu <[hidden email]> wrote: > Hi Jingsong, > > I don't think it should follow SQL CAST semantics, because it is out of > SQL, it happens in connectors which converts users'/external's format into > SQL types. > I also doubt "timestampFormat" may not work in some cases, because the > timestamp format maybe various and mixed in a topic. > > Best, > Jark > > On Wed, 26 Feb 2020 at 22:20, Jingsong Li <[hidden email]> wrote: > >> Thanks all for your discussion. >> >> Hi Dawid, >> >> +1 to apply the logic of parsing a SQL timestamp literal. >> >> I don't fully understand the matrix your list. Should this be the >> semantics of SQL cast? >> Do you mean this is implicit cast in JSON parser? >> I doubt that because these implicit casts are not support >> in LogicalTypeCasts. And it is not so good to understand when it occur >> silently. >> >> How about add "timestampFormat" property to JSON parser? Its default >> value is SQL timestamp literal format. And user can configure this. >> >> Best, >> Jingsong Lee >> >> On Wed, Feb 26, 2020 at 6:39 PM Jark Wu <[hidden email]> wrote: >> >>> Hi Dawid, >>> >>> I agree with you. If we want to loosen the format constraint, the >>> important piece is the conversion matrix. >>> >>> The conversion matrix you listed makes sense to me. From my >>> understanding, >>> there should be 6 combination. >>> We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => WITH >>> TIMEZONE to make the matrix complete. >>> When the community reach an agreement on this, we should write it down on >>> the documentation and follow the matrix in all text-based formats. >>> >>> Regarding to the RFC 3339 compatibility mode switch, it also sounds good >>> to >>> me. >>> >>> Best, >>> Jark >>> >>> On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz <[hidden email]> >>> wrote: >>> >>> > Hi all, >>> > >>> > @NiYanchun Thank you for reporting this. Yes I think we could improve >>> the >>> > behaviour of the JSON format. >>> > >>> > @Jark First of all I do agree we could/should improve the >>> > "user-friendliness" of the JSON format (and unify the behavior across >>> text >>> > based formats). I am not sure though if it is as simple as just ignore >>> the >>> > time zone here. >>> > >>> > My suggestion would be rather to apply the logic of parsing a SQL >>> > timestamp literal (if the expected type is of >>> LogicalTypeFamily.TIMESTAMP), >>> > which would actually also derive the "stored" type of the timestamp >>> (either >>> > WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql >>> conversion. >>> > Therefore if the >>> > >>> > parsed type | requested type | >>> behaviour >>> > >>> > WITHOUT TIMEZONE | WITH TIMEZONE | store the local >>> > timezone with the data >>> > >>> > WITHOUT TIMEZONE | WITH LOCAL TIMEZONE | do nothing in the >>> data, >>> > interpret the time in local timezone >>> > >>> > WITH TIMEZONE | WITH LOCAL TIMEZONE | convert the >>> timestamp >>> > to local timezone and drop the time zone information >>> > >>> > WITH TIMEZONE | WITHOUT TIMEZONE | drop the time >>> zone >>> > information >>> > >>> > It might just boil down to what you said "being more lenient with >>> regards >>> > to parsing the time zone". Nevertheless I think this way it is a bit >>> better >>> > defined behaviour, especially as it has a defined behaviour when >>> converting >>> > between representation with or without time zone. >>> > >>> > An implementation note. I think we should aim to base the >>> implementation >>> > on the DataTypes already rather than going back to the TypeInformation. >>> > >>> > I would still try to leave the RFC 3339 compatibility mode, but maybe >>> for >>> > that mode it would make sense to not support any types WITHOUT >>> TIMEZONE? >>> > This would be enabled with a switch (disabled by default). As I >>> understand >>> > the RFC, making the time zone mandatory is actually a big part of the >>> > standard as it makes time types unambiguous. >>> > >>> > What do you think? >>> > >>> > Ps. I cross posted this on the dev ML. >>> > >>> > Best, >>> > >>> > Dawid >>> > >>> > >>> > On 26/02/2020 03:45, Jark Wu wrote: >>> > >>> > Yes, I'm also in favor of loosen the datetime format constraint. >>> > I guess most of the users don't know there is a JSON standard which >>> > follows RFC 3339. >>> > >>> > Best, >>> > Jark >>> > >>> > On Wed, 26 Feb 2020 at 10:06, NiYanchun <[hidden email]> wrote: >>> > >>> >> Yes, these Types definition are general. As a user/developer, I would >>> >> support “loosen it for usability”. If not, may add some explanation >>> >> about JSON. >>> >> >>> >> >>> >> >>> >> Original Message >>> >> *Sender:* Jark Wu<[hidden email]> >>> >> *Recipient:* Outlook<[hidden email]>; Dawid Wysakowicz< >>> >> [hidden email]> >>> >> *Cc:* godfrey he<[hidden email]>; Leonard Xu<[hidden email]>; >>> >> user<[hidden email]> >>> >> *Date:* Wednesday, Feb 26, 2020 09:55 >>> >> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API >>> >> >>> >> Hi Outlook, >>> >> >>> >> The explanation in DataTypes is correct, it is compliant to SQL >>> standard. >>> >> The problem is that JsonRowDeserializationSchema only support >>> RFC-3339. >>> >> On the other hand, CsvRowDeserializationSchema supports to parse >>> >> "2019-07-09 02:02:00.040". >>> >> >>> >> So the question is shall we insist on the RFC-3339 "standard"? Shall >>> we >>> >> loosen it for usability? >>> >> What do you think @Dawid Wysakowicz <[hidden email]> ? >>> >> >>> >> Best, >>> >> Jark >>> >> >>> >> On Wed, 26 Feb 2020 at 09:29, Outlook <[hidden email]> wrote: >>> >> >>> >>> Thanks Godfrey and Leonard, I tried your answers, result is OK. >>> >>> >>> >>> >>> >>> BTW, I think if only accept such format for a long time, the TIME >>> and >>> >>> TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` may >>> be >>> >>> better to update, >>> >>> >>> >>> because the document now is not what the method really support. For >>> >>> example, >>> >>> >>> >>> >>> >>> ``` >>> >>> /** >>> >>> * Data type of a time WITHOUT time zone {@code TIME} with no >>> fractional >>> >>> seconds by default. >>> >>> * >>> >>> * <p>An instance consists of {@code hour:minute:second} with up to >>> >>> second precision >>> >>> * and values ranging from {@code 00:00:00} to {@code 23:59:59}. >>> >>> * >>> >>> * <p>Compared to the SQL standard, leap seconds (23:59:60 and >>> 23:59:61) >>> >>> are not supported as the >>> >>> * semantics are closer to {@link java.time.LocalTime}. A time WITH >>> time >>> >>> zone is not provided. >>> >>> * >>> >>> * @see #TIME(int) >>> >>> * @see TimeType >>> >>> */ >>> >>> public static DataType TIME() { >>> >>> return new AtomicDataType(new TimeType()); >>> >>> >>> >>> }``` >>> >>> >>> >>> >>> >>> Thanks again. >>> >>> >>> >>> Original Message >>> >>> *Sender:* Leonard Xu<[hidden email]> >>> >>> *Recipient:* godfrey he<[hidden email]> >>> >>> *Cc:* Outlook<[hidden email]>; user<[hidden email]> >>> >>> *Date:* Tuesday, Feb 25, 2020 22:56 >>> >>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API >>> >>> >>> >>> Hi,Outlook >>> >>> Godfrey is right, you should follow the json format[1] when you parse >>> >>> your json message. >>> >>> You can use following code to produce a json data-time String. >>> >>> ``` >>> >>> >>> >>> Long time = System.currentTimeMillis();DateFormat dateFormat = new >>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");Date date = new >>> Date(time);String jsonSchemaDate = dateFormat.format(date); >>> >>> >>> >>> ``` >>> >>> [1] >>> >>> >>> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times >>> >>> >>> >>> 在 2020年2月25日,22:15,godfrey he <[hidden email]> 写道: >>> >>> >>> >>> hi, I find that JsonRowDeserializationSchema only supports date-time >>> >>> with timezone according to RFC 3339. So you need add timezone to >>> time data >>> >>> (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope >>> it can >>> >>> help you. >>> >>> >>> >>> Bests, >>> >>> godfrey >>> >>> >>> >>> Outlook <[hidden email]> 于2020年2月25日周二 下午5:49写道: >>> >>> >>> >>>> By the way, my flink version is 1.10.0. >>> >>>> >>> >>>> Original Message >>> >>>> *Sender:* Outlook<[hidden email]> >>> >>>> *Recipient:* user<[hidden email]> >>> >>>> *Date:* Tuesday, Feb 25, 2020 17:43 >>> >>>> *Subject:* TIME/TIMESTAMP parse in Flink TABLE/SQL API >>> >>>> >>> >>>> Hi all, >>> >>>> >>> >>>> I read json data from kafka, and print to console. When I do this, >>> some >>> >>>> error occurs when time/timestamp deserialization. >>> >>>> >>> >>>> json data in Kafka: >>> >>>> >>> >>>> ``` >>> >>>> { >>> >>>> "server_date": "2019-07-09", >>> >>>> "server_time": "14:02:00", >>> >>>> "reqsndtime_c": "2019-07-09 02:02:00.040" >>> >>>> } >>> >>>> ``` >>> >>>> >>> >>>> flink code: >>> >>>> >>> >>>> ``` >>> >>>> bsTableEnv.connect( >>> >>>> new Kafka() >>> >>>> .version("universal") >>> >>>> .topic("xxx") >>> >>>> .property("bootstrap.servers", "localhost:9092") >>> >>>> .property("zookeeper.connect", "localhost:2181") >>> >>>> .property("group.id", "g1") >>> >>>> .startFromEarliest() >>> >>>> ).withFormat( >>> >>>> new Json() >>> >>>> .failOnMissingField(false) >>> >>>> ).withSchema( >>> >>>> new Schema() >>> >>>> .field("server_date", DataTypes.DATE()) >>> >>>> .field("server_time", DataTypes.TIME()) >>> >>>> .field("reqsndtime_c", DataTypes.TIMESTAMP(3)) >>> >>>> ).inAppendMode() >>> >>>> .createTemporaryTable("xxx”); >>> >>>> ``` >>> >>>> >>> >>>> >>> >>>> server_date with format is ok, but server_time with >>> DataTypes.DATE() >>> >>>> and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error. If I >>> change them >>> >>>> to DataTypes.STRING(), everything will be OK. >>> >>>> >>> >>>> Error message: >>> >>>> ``` >>> >>>> Exception in thread "main" java.util.concurrent.ExecutionException: >>> >>>> org.apache.flink.client.program.ProgramInvocationException: Job >>> failed >>> >>>> (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) >>> >>>> at >>> >>>> >>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >>> >>>> at >>> >>>> >>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) >>> >>>> at >>> >>>> >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640) >>> >>>> at >>> >>>> >>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) >>> >>>> at >>> >>>> >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) >>> >>>> at >>> >>>> >>> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42) >>> >>>> at >>> >>>> >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643) >>> >>>> at cn.com.agree.Main.main(Main.java:122) >>> >>>> Caused by: >>> org.apache.flink.client.program.ProgramInvocationException: >>> >>>> Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) >>> >>>> at >>> >>>> >>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) >>> >>>> at >>> >>>> >>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) >>> >>>> at >>> >>>> >>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) >>> >>>> at >>> >>>> >>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>> >>>> at >>> >>>> >>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) >>> >>>> at akka.dispatch.OnComplete.internal(Future.scala:264) >>> >>>> at akka.dispatch.OnComplete.internal(Future.scala:261) >>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) >>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) >>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) >>> >>>> at >>> >>>> >>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) >>> >>>> at >>> >>>> >>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) >>> >>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) >>> >>>> at >>> >>>> >>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) >>> >>>> at >>> >>>> >>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) >>> >>>> at >>> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) >>> >>>> at >>> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) >>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>> >>>> at >>> >>>> >>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >>> >>>> at >>> >>>> >>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >>> >>>> at >>> >>>> >>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>> >>>> at >>> >>>> >>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>> >>>> at >>> >>>> >>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >>> >>>> at >>> >>>> >>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >>> >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >>> >>>> at >>> >>>> >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >>> >>>> at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> >>>> at >>> >>>> >>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> >>>> at >>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> >>>> at >>> >>>> >>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>> Job >>> >>>> execution failed. >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) >>> >>>> at >>> >>>> >>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) >>> >>>> ... 31 more >>> >>>> Caused by: org.apache.flink.runtime.JobException: Recovery is >>> >>>> suppressed by NoRestartBackoffTimeStrategy >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) >>> >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>> >>>> at >>> >>>> >>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>> >>>> at >>> >>>> >>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>> >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >>> >>>> at >>> >>>> >>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>> >>>> at >>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>> >>>> at akka.japi.pf >>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>> >>>> at >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>> >>>> at >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>> >>>> at >>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>> >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>> >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>> >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>> >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>> >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>> >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>> >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>> >>>> ... 4 more >>> >>>> Caused by: java.io.IOException: Failed to deserialize JSON object. >>> >>>> at >>> >>>> >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) >>> >>>> at >>> >>>> >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) >>> >>>> at >>> >>>> >>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) >>> >>>> at >>> >>>> >>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) >>> >>>> at >>> >>>> >>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) >>> >>>> at >>> >>>> >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>> >>>> at >>> >>>> >>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>> >>>> at >>> >>>> >>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) >>> >>>> Caused by: java.time.format.DateTimeParseException: *Text '14:02:00' >>> >>>> could not be parsed at index 8* >>> >>>> at >>> >>>> >>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) >>> >>>> at >>> java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) >>> >>>> at >>> >>>> >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390) >>> >>>> at >>> >>>> >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>> >>>> at >>> >>>> >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >>> >>>> at >>> >>>> >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >>> >>>> at >>> >>>> >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>> >>>> at >>> >>>> >>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) >>> >>>> ... 7 more >>> >>>> >>> >>>> Process finished with exit code 1 >>> >>>> ``` >>> >>>> >>> >>>> reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception. I >>> see >>> >>>> the doc, DataTypes.TIME() value range is from {@code 00:00:00} to >>> {@code >>> >>>> 23:59:59} , DataTypes.TIMESTAMP value range is from {@code >>> 0000-01-01 >>> >>>> 00:00:00.000000000} to >>> >>>> * {@code 9999-12-31 23:59:59.999999999}. And my value is in the >>> range, >>> >>>> I don’t know why. And I see this may be bug in java 8, I change >>> jdk to 11, >>> >>>> >>> >>>> error still occurs. >>> >>>> >>> >>>> Can someone give me some help, thanks in advance. >>> >>>> >>> >>> >>> >>> >>> >> >> >> -- >> Best, Jingsong Lee >> > -- Best, Jingsong Lee |
Hi Jingsong, Dawid,
I created https://issues.apache.org/jira/browse/FLINK-16725 to track this issue. We can continue discussion there. Best, Jark On Thu, 27 Feb 2020 at 10:32, Jingsong Li <[hidden email]> wrote: > Hi Jark, > > The matrix I see is SQL cast. If we need bring another conversion matrix > that is different from SQL cast, I don't understand the benefits. It makes > me difficult to understand. > And It seems bad to change the timestamp of different time zones to the > same value silently. > > I have seen a lot of timestamp formats, SQL, ISO, RFC. I can think that a > "timestampFormat" could help them to deal with various formats. > What way do you think can solve all the problems? > > Best, > Jingsong Lee > > On Wed, Feb 26, 2020 at 10:45 PM Jark Wu <[hidden email]> wrote: > >> Hi Jingsong, >> >> I don't think it should follow SQL CAST semantics, because it is out of >> SQL, it happens in connectors which converts users'/external's format into >> SQL types. >> I also doubt "timestampFormat" may not work in some cases, because the >> timestamp format maybe various and mixed in a topic. >> >> Best, >> Jark >> >> On Wed, 26 Feb 2020 at 22:20, Jingsong Li <[hidden email]> wrote: >> >>> Thanks all for your discussion. >>> >>> Hi Dawid, >>> >>> +1 to apply the logic of parsing a SQL timestamp literal. >>> >>> I don't fully understand the matrix your list. Should this be the >>> semantics of SQL cast? >>> Do you mean this is implicit cast in JSON parser? >>> I doubt that because these implicit casts are not support >>> in LogicalTypeCasts. And it is not so good to understand when it occur >>> silently. >>> >>> How about add "timestampFormat" property to JSON parser? Its default >>> value is SQL timestamp literal format. And user can configure this. >>> >>> Best, >>> Jingsong Lee >>> >>> On Wed, Feb 26, 2020 at 6:39 PM Jark Wu <[hidden email]> wrote: >>> >>>> Hi Dawid, >>>> >>>> I agree with you. If we want to loosen the format constraint, the >>>> important piece is the conversion matrix. >>>> >>>> The conversion matrix you listed makes sense to me. From my >>>> understanding, >>>> there should be 6 combination. >>>> We can add WITHOUT TIMEZONE => WITHOUT TIMEZONE and WITH TIMEZONE => >>>> WITH >>>> TIMEZONE to make the matrix complete. >>>> When the community reach an agreement on this, we should write it down >>>> on >>>> the documentation and follow the matrix in all text-based formats. >>>> >>>> Regarding to the RFC 3339 compatibility mode switch, it also sounds >>>> good to >>>> me. >>>> >>>> Best, >>>> Jark >>>> >>>> On Wed, 26 Feb 2020 at 17:44, Dawid Wysakowicz <[hidden email]> >>>> wrote: >>>> >>>> > Hi all, >>>> > >>>> > @NiYanchun Thank you for reporting this. Yes I think we could improve >>>> the >>>> > behaviour of the JSON format. >>>> > >>>> > @Jark First of all I do agree we could/should improve the >>>> > "user-friendliness" of the JSON format (and unify the behavior across >>>> text >>>> > based formats). I am not sure though if it is as simple as just >>>> ignore the >>>> > time zone here. >>>> > >>>> > My suggestion would be rather to apply the logic of parsing a SQL >>>> > timestamp literal (if the expected type is of >>>> LogicalTypeFamily.TIMESTAMP), >>>> > which would actually also derive the "stored" type of the timestamp >>>> (either >>>> > WITHOUT TIMEZONE or WITH TIMEZONE) and then apply a proper sql >>>> conversion. >>>> > Therefore if the >>>> > >>>> > parsed type | requested type | >>>> behaviour >>>> > >>>> > WITHOUT TIMEZONE | WITH TIMEZONE | store the local >>>> > timezone with the data >>>> > >>>> > WITHOUT TIMEZONE | WITH LOCAL TIMEZONE | do nothing in the >>>> data, >>>> > interpret the time in local timezone >>>> > >>>> > WITH TIMEZONE | WITH LOCAL TIMEZONE | convert the >>>> timestamp >>>> > to local timezone and drop the time zone information >>>> > >>>> > WITH TIMEZONE | WITHOUT TIMEZONE | drop the time >>>> zone >>>> > information >>>> > >>>> > It might just boil down to what you said "being more lenient with >>>> regards >>>> > to parsing the time zone". Nevertheless I think this way it is a bit >>>> better >>>> > defined behaviour, especially as it has a defined behaviour when >>>> converting >>>> > between representation with or without time zone. >>>> > >>>> > An implementation note. I think we should aim to base the >>>> implementation >>>> > on the DataTypes already rather than going back to the >>>> TypeInformation. >>>> > >>>> > I would still try to leave the RFC 3339 compatibility mode, but maybe >>>> for >>>> > that mode it would make sense to not support any types WITHOUT >>>> TIMEZONE? >>>> > This would be enabled with a switch (disabled by default). As I >>>> understand >>>> > the RFC, making the time zone mandatory is actually a big part of the >>>> > standard as it makes time types unambiguous. >>>> > >>>> > What do you think? >>>> > >>>> > Ps. I cross posted this on the dev ML. >>>> > >>>> > Best, >>>> > >>>> > Dawid >>>> > >>>> > >>>> > On 26/02/2020 03:45, Jark Wu wrote: >>>> > >>>> > Yes, I'm also in favor of loosen the datetime format constraint. >>>> > I guess most of the users don't know there is a JSON standard which >>>> > follows RFC 3339. >>>> > >>>> > Best, >>>> > Jark >>>> > >>>> > On Wed, 26 Feb 2020 at 10:06, NiYanchun <[hidden email]> >>>> wrote: >>>> > >>>> >> Yes, these Types definition are general. As a user/developer, I would >>>> >> support “loosen it for usability”. If not, may add some explanation >>>> >> about JSON. >>>> >> >>>> >> >>>> >> >>>> >> Original Message >>>> >> *Sender:* Jark Wu<[hidden email]> >>>> >> *Recipient:* Outlook<[hidden email]>; Dawid Wysakowicz< >>>> >> [hidden email]> >>>> >> *Cc:* godfrey he<[hidden email]>; Leonard Xu<[hidden email] >>>> >; >>>> >> user<[hidden email]> >>>> >> *Date:* Wednesday, Feb 26, 2020 09:55 >>>> >> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API >>>> >> >>>> >> Hi Outlook, >>>> >> >>>> >> The explanation in DataTypes is correct, it is compliant to SQL >>>> standard. >>>> >> The problem is that JsonRowDeserializationSchema only support >>>> RFC-3339. >>>> >> On the other hand, CsvRowDeserializationSchema supports to parse >>>> >> "2019-07-09 02:02:00.040". >>>> >> >>>> >> So the question is shall we insist on the RFC-3339 "standard"? Shall >>>> we >>>> >> loosen it for usability? >>>> >> What do you think @Dawid Wysakowicz <[hidden email]> ? >>>> >> >>>> >> Best, >>>> >> Jark >>>> >> >>>> >> On Wed, 26 Feb 2020 at 09:29, Outlook <[hidden email]> wrote: >>>> >> >>>> >>> Thanks Godfrey and Leonard, I tried your answers, result is OK. >>>> >>> >>>> >>> >>>> >>> BTW, I think if only accept such format for a long time, the TIME >>>> and >>>> >>> TIMESTAMP methods' doc in `org.apache.flink.table.api.DataTypes` >>>> may be >>>> >>> better to update, >>>> >>> >>>> >>> because the document now is not what the method really support. For >>>> >>> example, >>>> >>> >>>> >>> >>>> >>> ``` >>>> >>> /** >>>> >>> * Data type of a time WITHOUT time zone {@code TIME} with no >>>> fractional >>>> >>> seconds by default. >>>> >>> * >>>> >>> * <p>An instance consists of {@code hour:minute:second} with up to >>>> >>> second precision >>>> >>> * and values ranging from {@code 00:00:00} to {@code 23:59:59}. >>>> >>> * >>>> >>> * <p>Compared to the SQL standard, leap seconds (23:59:60 and >>>> 23:59:61) >>>> >>> are not supported as the >>>> >>> * semantics are closer to {@link java.time.LocalTime}. A time WITH >>>> time >>>> >>> zone is not provided. >>>> >>> * >>>> >>> * @see #TIME(int) >>>> >>> * @see TimeType >>>> >>> */ >>>> >>> public static DataType TIME() { >>>> >>> return new AtomicDataType(new TimeType()); >>>> >>> >>>> >>> }``` >>>> >>> >>>> >>> >>>> >>> Thanks again. >>>> >>> >>>> >>> Original Message >>>> >>> *Sender:* Leonard Xu<[hidden email]> >>>> >>> *Recipient:* godfrey he<[hidden email]> >>>> >>> *Cc:* Outlook<[hidden email]>; user<[hidden email]> >>>> >>> *Date:* Tuesday, Feb 25, 2020 22:56 >>>> >>> *Subject:* Re: TIME/TIMESTAMP parse in Flink TABLE/SQL API >>>> >>> >>>> >>> Hi,Outlook >>>> >>> Godfrey is right, you should follow the json format[1] when you >>>> parse >>>> >>> your json message. >>>> >>> You can use following code to produce a json data-time String. >>>> >>> ``` >>>> >>> >>>> >>> Long time = System.currentTimeMillis();DateFormat dateFormat = new >>>> SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");Date date = new >>>> Date(time);String jsonSchemaDate = dateFormat.format(date); >>>> >>> >>>> >>> ``` >>>> >>> [1] >>>> >>> >>>> https://json-schema.org/understanding-json-schema/reference/string.html#dates-and-times >>>> >>> >>>> >>> 在 2020年2月25日,22:15,godfrey he <[hidden email]> 写道: >>>> >>> >>>> >>> hi, I find that JsonRowDeserializationSchema only supports date-time >>>> >>> with timezone according to RFC 3339. So you need add timezone to >>>> time data >>>> >>> (like 14:02:00Z) and timestamp data(2019-07-09T02:02:00.040Z). Hope >>>> it can >>>> >>> help you. >>>> >>> >>>> >>> Bests, >>>> >>> godfrey >>>> >>> >>>> >>> Outlook <[hidden email]> 于2020年2月25日周二 下午5:49写道: >>>> >>> >>>> >>>> By the way, my flink version is 1.10.0. >>>> >>>> >>>> >>>> Original Message >>>> >>>> *Sender:* Outlook<[hidden email]> >>>> >>>> *Recipient:* user<[hidden email]> >>>> >>>> *Date:* Tuesday, Feb 25, 2020 17:43 >>>> >>>> *Subject:* TIME/TIMESTAMP parse in Flink TABLE/SQL API >>>> >>>> >>>> >>>> Hi all, >>>> >>>> >>>> >>>> I read json data from kafka, and print to console. When I do this, >>>> some >>>> >>>> error occurs when time/timestamp deserialization. >>>> >>>> >>>> >>>> json data in Kafka: >>>> >>>> >>>> >>>> ``` >>>> >>>> { >>>> >>>> "server_date": "2019-07-09", >>>> >>>> "server_time": "14:02:00", >>>> >>>> "reqsndtime_c": "2019-07-09 02:02:00.040" >>>> >>>> } >>>> >>>> ``` >>>> >>>> >>>> >>>> flink code: >>>> >>>> >>>> >>>> ``` >>>> >>>> bsTableEnv.connect( >>>> >>>> new Kafka() >>>> >>>> .version("universal") >>>> >>>> .topic("xxx") >>>> >>>> .property("bootstrap.servers", "localhost:9092") >>>> >>>> .property("zookeeper.connect", "localhost:2181") >>>> >>>> .property("group.id", "g1") >>>> >>>> .startFromEarliest() >>>> >>>> ).withFormat( >>>> >>>> new Json() >>>> >>>> .failOnMissingField(false) >>>> >>>> ).withSchema( >>>> >>>> new Schema() >>>> >>>> .field("server_date", DataTypes.DATE()) >>>> >>>> .field("server_time", DataTypes.TIME()) >>>> >>>> .field("reqsndtime_c", DataTypes.TIMESTAMP(3)) >>>> >>>> ).inAppendMode() >>>> >>>> .createTemporaryTable("xxx”); >>>> >>>> ``` >>>> >>>> >>>> >>>> >>>> >>>> server_date with format is ok, but server_time with >>>> DataTypes.DATE() >>>> >>>> and reqsndtime_c with DataTypes.TIMESTAMP(3) cause error. If I >>>> change them >>>> >>>> to DataTypes.STRING(), everything will be OK. >>>> >>>> >>>> >>>> Error message: >>>> >>>> ``` >>>> >>>> Exception in thread "main" java.util.concurrent.ExecutionException: >>>> >>>> org.apache.flink.client.program.ProgramInvocationException: Job >>>> failed >>>> >>>> (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) >>>> >>>> at >>>> >>>> >>>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >>>> >>>> at >>>> >>>> >>>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1640) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:74) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1620) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.table.planner.delegation.StreamExecutor.execute(StreamExecutor.java:42) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:643) >>>> >>>> at cn.com.agree.Main.main(Main.java:122) >>>> >>>> Caused by: >>>> org.apache.flink.client.program.ProgramInvocationException: >>>> >>>> Job failed (JobID: 395d1ba3d41f92734d4ef25aa5f9b4a1) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:112) >>>> >>>> at >>>> >>>> >>>> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:602) >>>> >>>> at >>>> >>>> >>>> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:577) >>>> >>>> at >>>> >>>> >>>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >>>> >>>> at >>>> >>>> >>>> java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1962) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:874) >>>> >>>> at akka.dispatch.OnComplete.internal(Future.scala:264) >>>> >>>> at akka.dispatch.OnComplete.internal(Future.scala:261) >>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) >>>> >>>> at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) >>>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:74) >>>> >>>> at >>>> >>>> >>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:44) >>>> >>>> at >>>> >>>> >>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:252) >>>> >>>> at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:572) >>>> >>>> at >>>> >>>> >>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) >>>> >>>> at >>>> >>>> >>>> akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) >>>> >>>> at >>>> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:436) >>>> >>>> at >>>> scala.concurrent.Future$$anonfun$andThen$1.apply(Future.scala:435) >>>> >>>> at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:36) >>>> >>>> at >>>> >>>> >>>> akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) >>>> >>>> at >>>> >>>> >>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:91) >>>> >>>> at >>>> >>>> >>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>>> >>>> at >>>> >>>> >>>> akka.dispatch.BatchingExecutor$BlockableBatch$$anonfun$run$1.apply(BatchingExecutor.scala:91) >>>> >>>> at >>>> >>>> >>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) >>>> >>>> at >>>> >>>> >>>> akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:90) >>>> >>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) >>>> >>>> at >>>> >>>> >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) >>>> >>>> at >>>> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> >>>> at >>>> >>>> >>>> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> >>>> at >>>> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> >>>> at >>>> >>>> >>>> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> >>>> Caused by: org.apache.flink.runtime.client.JobExecutionException: >>>> Job >>>> >>>> execution failed. >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:147) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:110) >>>> >>>> ... 31 more >>>> >>>> Caused by: org.apache.flink.runtime.JobException: Recovery is >>>> >>>> suppressed by NoRestartBackoffTimeStrategy >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380) >>>> >>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>> >>>> at >>>> >>>> >>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>> >>>> at >>>> >>>> >>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>> >>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152) >>>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) >>>> >>>> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) >>>> >>>> at >>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>> >>>> at akka.japi.pf >>>> .UnitCaseStatement.applyOrElse(CaseStatements.scala:21) >>>> >>>> at >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) >>>> >>>> at >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> >>>> at >>>> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) >>>> >>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:517) >>>> >>>> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) >>>> >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) >>>> >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:561) >>>> >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) >>>> >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:225) >>>> >>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) >>>> >>>> ... 4 more >>>> >>>> Caused by: java.io.IOException: Failed to deserialize JSON object. >>>> >>>> at >>>> >>>> >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:133) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:76) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196) >>>> >>>> Caused by: java.time.format.DateTimeParseException: *Text >>>> '14:02:00' >>>> >>>> could not be parsed at index 8* >>>> >>>> at >>>> >>>> >>>> java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949) >>>> >>>> at >>>> java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertToLocalTime(JsonRowDeserializationSchema.java:390) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:439) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:418) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:236) >>>> >>>> at >>>> >>>> >>>> org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:131) >>>> >>>> ... 7 more >>>> >>>> >>>> >>>> Process finished with exit code 1 >>>> >>>> ``` >>>> >>>> >>>> >>>> reqsndtime_c with DataTypes.TIMESTAMP(3) has similar exception. I >>>> see >>>> >>>> the doc, DataTypes.TIME() value range is from {@code 00:00:00} >>>> to {@code >>>> >>>> 23:59:59} , DataTypes.TIMESTAMP value range is from {@code >>>> 0000-01-01 >>>> >>>> 00:00:00.000000000} to >>>> >>>> * {@code 9999-12-31 23:59:59.999999999}. And my value is in the >>>> range, >>>> >>>> I don’t know why. And I see this may be bug in java 8, I change >>>> jdk to 11, >>>> >>>> >>>> >>>> error still occurs. >>>> >>>> >>>> >>>> Can someone give me some help, thanks in advance. >>>> >>>> >>>> >>> >>>> >>> >>>> >>> >>> >>> -- >>> Best, Jingsong Lee >>> >> > > -- > Best, Jingsong Lee > |
Free forum by Nabble | Edit this page |