Need help on Upsert-kafka connector with confluent schema registry

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Need help on Upsert-kafka connector with confluent schema registry

Shamit
This post was updated on .
Hello Team,

I am facing issue with "upsert-kafka" connector which should read the Avro
message serialized using
"io.confluent.kafka.serializers.KafkaAvroDeserializer". Same is working with
"kafka" connector.

Looks like we are not able to pass the schema registry url and subject name
like the way we are passing while using "kafka" connector.

Please help.


Table definition with upsert-kafka is below (not working),

                CREATE TABLE proposalLine (PROPOSAL_LINE_ID
bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED
) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' =
'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' =
'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'='
<a href="http://localhost:8081'">http://localhost:8081',
'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer',
'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer')

ERROR:
     Caused by: java.io.IOException: Failed to deserialize Avro record.
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
        at
org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
        at
org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
        at
org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
        at
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
        at
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
        at
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
        at
org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
        at
org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
        at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at
org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
        at
org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
        at
org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
        at
org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
        at
org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139)
        at
org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
        ... 9 more



Table definition with kafka connector is below (working),
CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String )
WITH ('connector' = 'kafka', 'properties.bootstrap.servers' =
'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
'lndcdcadsprpslproposalline',
'format'='avro-confluent','avro-confluent.schema-registry.url' = '
<<a href="http://localhost:8081'">http://localhost:8081'>  <a href="http://localhost:8081'">http://localhost:8081',
'avro-confluent.schema-registry.subject' =
'lndcdcadsprpslproposalline-value')

Regards,
Shamit <<a href="http://localhost:8081'">http://localhost:8081'> 



--
Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
Reply | Threaded
Open this post in threaded view
|

Re: Need help on Upsert-kafka connector with confluent schema registry

Shengkai Fang
Hi Shamit,

I think the main problem is the wrong usage of the upsert kafka ddl. In the
ddl, you use avro as the format rather than avro-confluent.

The dev mail list is used to discuss implementation details. Please send
emails to user mail list for help.

[1] https://flink.apache.org/gettinghelp.html#user-mailing-list

Shamit <[hidden email]> 于2021年2月7日周日 下午1:21写道:

> Hello Team,
>
> I am facing issue with "upsert-kafka" connector which should read the Avro
> message serialized using
> "io.confluent.kafka.serializers.KafkaAvroDeserializer". Same is working
> with
> "kafka" connector.
>
> Looks like we are not able to pass the schema registry url and subject name
> like the way we are passing while using "kafka" connector.
>
> Please help.
>
>
> Table definition with upsert-kafka is below (not working),
>
>                 CREATE TABLE proposalLine (PROPOSAL_LINE_ID
> bigint,LAST_MODIFIED_BY STRING ,PRIMARY KEY (PROPOSAL_LINE_ID) NOT ENFORCED
> ) "WITH ('connector' = 'upsert-kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline', 'key.format' = 'avro', 'value.format' =
> 'avro', 'properties.group.id'='dd', 'properties.schema.registry.url'='
> <a href="http://localhost:8081'">http://localhost:8081',
>
> 'properties.key.deserializer'='org.apache.kafka.common.serialization.LongDeserializer',
>
> 'properties.value.deserializer'='io.confluent.kafka.serializers.KafkaAvroDeserializer')
>
> ERROR:
>      Caused by: java.io.IOException: Failed to deserialize Avro record.
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:101)
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:44)
>         at
>
> org.apache.flink.api.common.serialization.DeserializationSchema.deserialize(DeserializationSchema.java:82)
>         at
>
> org.apache.flink.streaming.connectors.kafka.table.DynamicKafkaDeserializationSchema.deserialize(DynamicKafkaDeserializationSchema.java:130)
>         at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:179)
>         at
>
> org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:142)
>         at
>
> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:826)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:110)
>         at
>
> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:66)
>         at
>
> org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:241)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: -1
>         at
> org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:460)
>         at
> org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:283)
>         at
>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:187)
>         at
>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>         at
>
> org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:259)
>         at
>
> org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:247)
>         at
>
> org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:179)
>         at
>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160)
>         at
>
> org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
>         at
>
> org.apache.flink.formats.avro.AvroDeserializationSchema.deserialize(AvroDeserializationSchema.java:139)
>         at
>
> org.apache.flink.formats.avro.AvroRowDataDeserializationSchema.deserialize(AvroRowDataDeserializationSchema.java:98)
>         ... 9 more
>
>
> Table definition with kafka connector is below (working),
> CREATE TABLE proposalLine (PROPOSAL_LINE_ID bigint,LAST_MODIFIED_BY String
> )
> WITH ('connector' = 'kafka', 'properties.bootstrap.servers' =
> 'localhost:9092', 'properties.auto.offset.reset' = 'earliest', 'topic' =
> 'lndcdcadsprpslproposalline',
> 'format'='avro-confluent','avro-confluent.schema-registry.url' = '
> <<a href="http://localhost:8081'">http://localhost:8081'>  <a href="http://localhost:8081'">http://localhost:8081',
> 'avro-confluent.schema-registry.subject' =
> 'lndcdcadsprpslproposalline-value')
>
> Regards,
> Shamit <<a href="http://localhost:8081'">http://localhost:8081'>
>
>
>
> --
> Sent from: http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/
>