laijunlin_data 2025-01-06 16:35 采纳率: 80%
浏览 15

flinkcdc读取sqlserver,通过checkpoint报错

#遇到问题的现象描述
flinkcdc读取sqlserver,通过checkpoint报错
#问题相关代码,报错内容

java.lang.AbstractMethodError: Method com/ververica/cdc/debezium/internal/FlinkDatabaseSchemaHistory.recover(Ljava/util/Map;Lio/debezium/relational/Tables;Lio/debezium/relational/ddl/DdlParser;)V is abstract
    at com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory.recover(FlinkDatabaseSchemaHistory.java) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at io.debezium.relational.history.DatabaseHistory.recover(DatabaseHistory.java:163) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:62) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:87) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:133) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:760) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_372]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_372]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_372]
2024-12-25 18:24:32,094 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - Calculating tasks to restart to recover the failed task e5a72f353fc1e6bbf3bd96a41384998c_0.
2024-12-25 18:24:32,095 INFO  org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy [] - 3 tasks should be restarted to recover the failed task e5a72f353fc1e6bbf3bd96a41384998c_0. 
2024-12-25 18:24:32,095 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job starlims_cdc_to_kafka (f062434f433b76e9bfb48979caba4de9) switched from state RUNNING to RESTARTING.
2024-12-25 18:24:32,095 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map -> Sink: sink (1/2) (8a0778a45103266d6e5fec12e59173f1) switched from RUNNING to CANCELING.
2024-12-25 18:24:32,095 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map -> Sink: sink (2/2) (c42c4c2f3881e0eb35ec767ad22d22f7) switched from RUNNING to CANCELING.
2024-12-25 18:24:32,134 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map -> Sink: sink (1/2) (8a0778a45103266d6e5fec12e59173f1) switched from CANCELING to CANCELED.
2024-12-25 18:24:32,144 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Map -> Sink: sink (2/2) (c42c4c2f3881e0eb35ec767ad22d22f7) switched from CANCELING to CANCELED.
2024-12-25 18:24:32,154 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: source_se -> Process (1/1) (a00e53108e1f85a387ae2f46db06d658) switched from INITIALIZING to RUNNING.
2024-12-25 18:24:32,241 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Source: source_se -> Process (1/1) (a00e53108e1f85a387ae2f46db06d658) switched from RUNNING to FAILED on container_e86_1735037911994_0019_01_000002 @ cnap422 (dataPort=35807).
java.lang.AbstractMethodError: Method com/ververica/cdc/debezium/internal/FlinkDatabaseSchemaHistory.recover(Ljava/util/Map;Lio/debezium/relational/Tables;Lio/debezium/relational/ddl/DdlParser;)V is abstract
    at com.ververica.cdc.debezium.internal.FlinkDatabaseSchemaHistory.recover(FlinkDatabaseSchemaHistory.java) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at io.debezium.relational.history.DatabaseHistory.recover(DatabaseHistory.java:163) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at io.debezium.relational.HistorizedRelationalDatabaseSchema.recover(HistorizedRelationalDatabaseSchema.java:62) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at io.debezium.connector.sqlserver.SqlServerConnectorTask.start(SqlServerConnectorTask.java:87) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:133) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at io.debezium.embedded.EmbeddedEngine.run(EmbeddedEngine.java:760) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at io.debezium.embedded.ConvertingEngineBuilder$2.run(ConvertingEngineBuilder.java:192) ~[streampark-flinkjob_starlims_cdc_to_kafka.jar:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_372]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_372]
    at java.lang.Thread.run(Thread.java:750) ~[?:1.8.0_372]

#我的初步解答思路
应该是flinkcdc版本和debezium冲突了,但是不知道怎么解决
#操作环境。软件版本等相关信息
flink 1.13.6 及其他依赖

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-sqlserver-cdc</artifactId>
            <version>3.0.0</version>
        </dependency>
        <dependency>
            <groupId>io.debezium</groupId>
            <artifactId>debezium-embedded</artifactId>
            <version>1.9.7.Final</version>
            <exclusions>
                <exclusion>
                    <artifactId>kafka-clients</artifactId>
                    <groupId>org.apache.kafka</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.2.0</version>
        </dependency>

  • 写回答

1条回答 默认 最新

  • 道友老李 JWE233286一种基于机器视觉的水表指针读数识别及修正的方法 专利发明者 2025-01-06 16:40
    关注
    让【道友老李】来帮你解答,本回答参考通义千问大模型编写,并整理提供,如果还有疑问可以点击头像关注私信或评论。
    如果答案让您满意,请采纳、关注,非常感谢!
    #问题分析 根据报错信息可以看出是java.lang.AbstractMethodError异常,说明FlinkDatabaseSchemaHistory类中的recover方法是抽象方法。抽象方法没有具体的实现,所以会导致运行时出错。 #解决方法 需要在FlinkDatabaseSchemaHistory类中实现recover方法,具体实现将会依赖于具体业务逻辑。
    public class FlinkDatabaseSchemaHistory implements DatabaseHistory {
        // 实现recover方法
        @Override
        public void recover(Map<String, Object> configuration, Tables schema, DdlParser ddlParser) {
            // 具体实现逻辑
            System.out.println("Recovering database schema history");
        }
    }
    

    通过以上示例代码,重写FlinkDatabaseSchemaHistory中的recover方法,添加具体的实现逻辑,可以解决AbstractMethodError异常。

    评论

报告相同问题?

问题事件

  • 创建了问题 1月6日