Zekun Yu created FLINK-19332:
-------------------------------- Summary: Special characters issue using Kinesis Data Analytics for Apache Flink Key: FLINK-19332 URL: https://issues.apache.org/jira/browse/FLINK-19332 Project: Flink Issue Type: Bug Components: Connectors / Kinesis Affects Versions: 1.8.2 Reporter: Zekun Yu Fix For: 1.8.2 Hi there, I am encountering one special character issue while using Kinesis Data Analytics for Apache Flink (KDA). Our KDA is built for processing data and outputting to a Kinesis stream. We have a lambda function that subscribes to the Kinesis stream and reads records from the Kinesis stream. The library in the KDA I am using is "org.apache.flink.streaming.connectors.kinesis". Our KDA is only outputting one single record to the Kinesis sink using "collector.collect()" for a single key (details will be found below) Most times, the record received by the Lambda looks perfectly good. However, occasionally, when two records are sent to the kinesis sink using "collector.collect()" at the same time, *we noticed that those two records are combined somehow and there are some special characters in the record received by the Lambda function*. Below are some technical details: The KDA is not using any "TimeWindow()" but uses "keyBy()" by some keys. ).returns(MatchedDataForAlarm.class) .keyBy(MatchedDataForAlarm::getStateKey) .connect(ruleBroadcastStream) .process(new MetricProcess()) .addSink(kinesis); The "MetricProcess()" extends "KeyedBroadcastProcessFunction" which overrides the "processElement" function. It uses collector.collect() for outputs. @Override public void processElement(MatchedDataForAlarm input, ReadOnlyContext ctx,Collector<MatchedDataForAlarm> collector) throws Exception { We have our own AEMMatchedDataForAlarmSchemaSerialization which implements KinesisSerializationSchema<MatchedDataForAlarm>. The serialization simply converts a "MatchedDataForAlarm" object to String using Gson and then converts to ByteBuffer. @Override public ByteBuffer serialize(MatchedDataForAlarm matchedDataForAlarm) { Gson gson = new Gson(); String result = gson.toJson(matchedDataForAlarm); _log_.info("Alarm record sent to Kinesis stream: {}", result); return ByteBuffer._wrap_(result.getBytes()); } *Here's the record shown in the Lambda logs when two records are combined somewhere somehow (most cases those two are received as two separate records):* ???? 0?? { "inAlarmState": false } ?? { "inAlarmState": false} e????E????o?N9x I am not sure if it's a serialization issue or some default behaviors in the Kinesis sink library? It might be just some common mistakes that I made which I am not aware of. Could anyone help with this problem? I really appreciate it. Thanks, Zekun -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |