FLINK JDBC SQL Connector遇到的类型转换问题

背景

Flink 1.3
最近在写Flink Sql的时候,遇到了java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long 问题

分析

直接上报错的sql,如下:

CREATE TABLE `xxx` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  BIGINT
) WITH (
  
);

具体的问题堆栈如下:

java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long
	at org.apache.flink.table.data.GenericRowData.getLong(GenericRowData.java:154)
	at JoinTableFuncCollector$9.collect(Unknown Source)
	at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
	at LookupFunction$4$TableFunctionResultConverterCollector$2.collect(Unknown Source)
	at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196)
	at org.apache.flink.connector.jdbc.table.JdbcRowDataLookupFunction.eval(JdbcRowDataLookupFunction.java:175)
	at LookupFunction$4.flatMap(Unknown Source)
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:81)
	at org.apache.flink.table.runtime.operators.join.lookup.LookupJoinRunner.processElement(LookupJoinRunner.java:34)
	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at StreamExecCalc$67.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.table.runtime.util.StreamRecordCollector.collect(StreamRecordCollector.java:44)
	at org.apache.flink.table.runtime.collector.TableFunctionCollector.outputResult(TableFunctionCollector.java:68)
	at StreamExecCorrelate$27$TableFunctionCollector$20.collect(Unknown Source)
	at org.apache.flink.table.runtime.collector.WrappingCollector.outputResult(WrappingCollector.java:39)
	at StreamExecCorrelate$27$TableFunctionResultConverterCollector$25.collect(Unknown Source)
	at org.apache.flink.table.functions.TableFunction.collect(TableFunction.java:196)
	at xxx.xxx.xxxFunction.eval(BinlogParserFunction.java:56)
	at StreamExecCorrelate$27.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at StreamExecCalc$17.processElement(Unknown Source)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:322)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:426)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordsWithTimestamps(AbstractFetcher.java:365)
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:212)
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaFetcher.runFetchLoop(KafkaFetcher.java:154)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.runWithPartitionDiscovery(FlinkKafkaConsumerBase.java:836)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:828)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:104)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:60)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:269)

其实这是mysql和flink SQL在做字段类型映射的时候,会出现的类型匹配问题:
查看一下对应mysql字段如下:

+-----------------------+---------------------+------+-----+---------+----------------+
| Field                 | Type                | Null | Key | Default | Extra          |
+-----------------------+---------------------+------+-----+---------+----------------+
| merchantId            | int(11) unsigned    | NO   | PRI | NULL    | auto_increment |
| userId                | bigint(20)          | NO   | MUL | NULL    |                |
| status                | tinyint(1) unsigned | NO   |     | 1       |                |

再参考Flink Data Type Mapping,如下:

MySQL typeFlink SQL type
INT UNSIGNEDBIGINT
BIGINTBIGINT
TINYINTTINYINT
INTINT

注意: 对于MYSQL类型是INT UNSIGNED 的字段,映射到FLINk应该为BIGINT类型,而不是INT类型

否则会报如下错误:

java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer

解决

把对一个的类型修改一下可以了:

CREATE TABLE `dim_merchant` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  BIGINT
) WITH (
  
);
           ||
           \/
CREATE TABLE `dim_merchant` (
  `merchantId`  BIGINT,
  `userId`  BIGINT,
  `status`  INT
) WITH (
  
);
### 回答1: FlinkJDBC连接器是一个用于在Flink流处理作业中与外部数据库进行交互的重要组件。这个连接器可以连接到多种数据库,其中包括Oracle数据库。 Oracle是一个流行的关系型数据库管理系统,广泛用于企业级应用和大规模数据存储。FlinkJDBC连接器通过使用Oracle JDBC驱动程序,在Flink作业中无缝集成了对Oracle数据库的访问和查询功能。 FlinkJDBC连接器支持Oracle数据库的各种操作,包括数据读取、数据写入和数据更新。使用FlinkJDBC连接器,可以轻松地从Oracle数据库中读取数据,进行流处理和分析,然后将结果写回到数据库中。 在Flink作业中使用JDBC连接器与Oracle数据库进行交互是相对简单的。只需要在Flink的作业配置文件中指定Oracle数据库的连接信息和表信息,并指定要执行的SQL语句或查询语句。FlinkJDBC连接器会根据配置信息建立与Oracle数据库的连接,并执行指定的操作。 总而言之,FlinkJDBC连接器是一个强大的工具,可以将Flink的流处理和分析能力与Oracle数据库的数据存储和查询功能结合起来。使用FlinkJDBC连接器,可以轻松地实现与Oracle数据库的集成,并实现复杂的数据处理和分析任务。 ### 回答2: Flink JDBC Connector 是 Apache Flink 框架的一个重要组件,它用于将 Flink 应用程序与关系型数据库进行连接和交互。而Oracle是一种功能强大且广泛使用的商业关系型数据库管理系统,所以可以肯定地说,Flink JDBC Connector 是支持连接和操作 Oracle 数据库的。 使用 Flink JDBC Connector 可以方便地在 Flink 应用程序中读取和写入 Oracle 数据库的数据。对于读取数据,可以通过指定查询语句来从 Oracle 数据库中提取数据,并将其转换为 Flink DataStream 或 Table 进行进一步处理和分析。对于写入数据,可以将 Flink 应用程序的计算结果直接插入到 Oracle 数据库中的指定表中。 Flink JDBC Connector 提供了与 Oracle 数据库交互所需的 JDBC 驱动程序,并具有处理数据库连接管理、事务管理等方面的能力。另外,Flink JDBC Connector 还支持将查询结果批量写入或者批量读取,以提高数据处理的效率和性能。 在使用 Flink JDBC Connector 连接 Oracle 数据库时,我们需要配置连接参数,包括数据库的 URL、用户名、密码等,并根据需要指定要执行的 SQL 查询语句或插入数据的表名。通过合理配置这些参数,Flink 应用程序可以轻松地与 Oracle 数据库进行数据交互。 总之,Flink JDBC Connector 是支持连接和操作 Oracle 数据库的,它为 Flink 应用程序提供了与 Oracle 数据库交互的便利性和灵活性,使得我们可以方便地在大数据处理中使用 Oracle 数据库。 ### 回答3: 是的,Flink JDBC连接器支持与Oracle数据库的集成。Flink提供了适用于Oracle的JDBC连接器,可以通过该连接器将Flink与Oracle数据库连接起来,并在Flink作业中读取和写入Oracle数据库的数据。 使用Flink JDBC连接器与Oracle集成非常简单。首先,我们需要在Flink作业中设置Oracle数据库的连接URL、用户名和密码等连接参数。然后,可以通过FlinkJDBCSourceFunction从Oracle数据库中读取数据,并将其转换为流数据进行进一步的处理和计算。另外,也可以使用FlinkJDBCSinkFunction将流数据写入到Oracle数据库中。 在与Oracle集成时,FlinkJDBC连接器提供了对Oracle特定的数据类型和功能的支持。它可以处理Oracle的数值、字符串、日期、时间戳等常见数据类型,并且支持Oracle的事务和批处理操作。此外,Flink还提供了对Oracle的连接池管理和数据分片等功能,以提高性能和可伸缩性。 总之,Flink JDBC连接器可以很方便地与Oracle数据库集成,实现在Flink作业中读取和写入Oracle数据库的功能。无论是实时数据流处理还是批处理作业,都可以通过Flink与Oracle进行无缝集成,以实现各种数据处理和分析的需求。
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值