Simon Tao created FLINK-22089:
--------------------------------- Summary: cdc checkpoint invalid Key: FLINK-22089 URL: https://issues.apache.org/jira/browse/FLINK-22089 Project: Flink Issue Type: Bug Components: Connectors / Common, Runtime / Checkpointing Affects Versions: 1.12.0 Environment: public static void main(String[] args) { SourceFunction<String> sourceFunction = MySQLSource.<String>builder() .hostname("xxx") .port(3306) .databaseList("xxx") // monitor all tables under inventory database .username("xxx") .password("xxx") .deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); try { env.setStateBackend(new RocksDBStateBackend("file:///E:/tmp/t2")); } catch (IOException e) { e.printStackTrace(); } env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message ordering try \{ env.execute(); } catch (Exception e) \{ e.printStackTrace(); } } ====================maven========================= <build> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <appendAssemblyId>false</appendAssemblyId> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>assembly</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime-web_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-jdbc --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-jdbc_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-test-utils --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-test-utils_${scala.version}</artifactId> <version>${flink.version}</version> <!--<scope>test</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-statebackend-rocksdb_${scala.version}</artifactId> <version>${flink.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba.ververica/flink-connector-mysql-cdc --> <dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>1.3.0</version> </dependency> <!-- https://mvnrepository.com/artifact/ru.ivi.opensource/flink-clickhouse-sink --> <dependency> <groupId>ru.ivi.opensource</groupId> <artifactId>flink-clickhouse-sink</artifactId> <version>1.3.0</version> </dependency> </dependencies> Reporter: Simon Tao Fix For: shaded-13.0 i turn on checkpoint but it seems invalid , during the whole process ,the checkpoint file can write to localfile and the flink can read the cdc recrods normally, but when i restart flink in idea ,it always reload consumed records . i paste on my code and maven configuration below -- This message was sent by Atlassian Jira (v8.3.4#803005) |
Free forum by Nabble | Edit this page |