本周在技术上做了以下几件事情:
- 学习java8后新的日期时间api
参考:LocalDateTime用法(jdk1.8 )
写一下自己的理解,LocateDateTime是按本地时区生成的时间,Instant是标准时间(即格林尼治时间),它们之间相互转换需要提供时区信息(ZoneId或Clock)。同时要注意新api对象都是不可变对象。 - Flink Sql 向Mysql写入数据时发现莫名删除历史数据问题解决
原因:如果某个key对应的数据在state中不存在了,会生成delete事件,从而触发mysql删除该条数据,找到源码如下
@Override
protected void addToBatch(RowData original, RowData extracted) throws SQLException {
switch (original.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
super.addToBatch(original, extracted);
break;
case DELETE:
case UPDATE_BEFORE:
// deleteExecutor.addToBatch(extracted);
break;
default:
throw new UnsupportedOperationException(
String.format("unknown row kind, the supported row kinds is: INSERT, UPDATE_BEFORE, UPDATE_AFTER," +
" DELETE, but get: %s.", original.getRowKind()));
}
}
解决方法:注释掉deleteExecutor.addToBatch(extracted);这一行