laijunlin_data 2025-01-06 16:30 采纳率: 80%
浏览 24
已采纳

flink读取kafka,自动重启后不往mysql写入数据了

#遇到问题的现象描述
flink读取kafka数据,写入mysql,程序运行一段时间后报错,之后自动重启。出现问题:flink任务自动重启后,kafka还是正常消费,但是不往数据库写数据了。
#问题相关代码,报错内容

org.apache.flink.util.FlinkException: Scheduler is being stopped.
    at org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:607) ~[streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:962) ~[streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:926) ~[streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:398) ~[streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:214) ~[streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:563) ~[streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:186) ~[streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.actor.Actor.aroundReceive(Actor.scala:517) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.actor.Actor.aroundReceive$(Actor.scala:515) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.actor.ActorCell.invoke(ActorCell.scala:561) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.dispatch.Mailbox.run(Mailbox.scala:225) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) [streampark-flinkjob_todolist_to_sgsmart.jar:?]
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) [streampark-flinkjob_todolist_to_sgsmart.jar:?]

#我的初步解答思路
查询mysql的错误连接,发现没有。可能是flink任务还能正常消费kafka的数据,但是已经不去获取mysql的连接往数据库写数据了
#操作环境。软件版本等相关信息
flink 1.13.6

  • 写回答

2条回答 默认 最新

  • 檀越@新空间 2025-01-06 16:35
    关注

    下午好🌅🌅🌅
    本答案参考通义千问

    根据你的描述,Flink任务在重启后不再向MySQL写入数据,尽管它仍然能够正常消费Kafka的数据。以下是一些可能的原因和解决方案:

    可能的原因

    1. 连接池耗尽:长时间运行的任务可能会耗尽数据库连接池中的连接。
    2. 状态恢复问题:Flink在重启时可能无法正确恢复其状态。
    3. 异常处理不当:在处理MySQL写入时,异常可能未被正确捕获和处理。
    4. 配置问题:某些配置参数可能导致连接问题。

    解决方案

    1. 检查连接池配置

      • 确保连接池大小足够大,并且有适当的超时设置。
      • 使用连接池库如HikariCP或C3P0。
    2. 增加重试机制

      • 在写入MySQL时增加重试逻辑,以应对短暂的连接问题。
    3. 持久化状态

      • 确保Flink的状态可以正确恢复,特别是在任务重启时。
    4. 优化异常处理

      • 捕获并记录所有可能的异常,确保异常不会导致任务停止。
    5. 检查日志

      • 查看Flink和MySQL的日志文件,寻找更多关于问题的信息。

    具体代码示例

    假设你使用的是Flink SQL来消费Kafka数据并写入MySQL,可以尝试以下改进:

    import org.apache.flink.api.common.functions.RuntimeContext;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    import org.apache.flink.streaming.connectors.jdbc.JdbcConnectionOptions;
    import org.apache.flink.streaming.connectors.jdbc.JdbcSink;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    
    import java.util.Properties;
    
    public class FlinkKafkaToMySQL {
        public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9092");
            properties.setProperty("group.id", "test");
    
            FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(
                    "your-topic",
                    new SimpleStringSchema(),
                    properties);
    
            DataStream<String> stream = env.addSource(consumer);
    
            stream.addSink(JdbcSink.sink(
                    "INSERT INTO your_table (column1, column2) VALUES (?, ?)",
                    (ps, t) -> {
                        ps.setString(1, t.substring(0, t.indexOf(",")));
                        ps.setString(2, t.substring(t.indexOf(",") + 1));
                    },
                    JdbcConnectionOptions.builder()
                            .withUrl("jdbc:mysql://localhost:3306/your_database")
                            .withDriverName("com.mysql.cj.jdbc.Driver")
                            .withUsername("your_username")
                            .withPassword("your_password")
                            .build()
            ));
    
            env.execute("Flink Kafka to MySQL Example");
        }
    }
    

    改进点

    1. 连接池配置

      • 使用HikariCP或其他连接池库来管理数据库连接。
    2. 重试机制

      • JdbcSink中增加重试逻辑。
    3. 异常处理

      • 增加异常捕获和日志记录。
    4. 持久化状态

      • 确保Flink状态可以正确恢复。

    通过以上改进,你可以更好地处理Flink任务重启后无法写入MySQL的问题。

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

问题事件

  • 已采纳回答 5月30日
  • 修改了问题 1月6日
  • 创建了问题 1月6日