目录
背景
2019年,基于“图形”(Graph)的存储、分析相关技术被Gartner评为十大数据与分析技术之一。据说基于“图”实现的存储可以进行高效的建模,进而可以探索与查询具有相互复杂关系的数据。而后研究“图”这类数据以及相关技术就成了是我的一个兴趣爱好。
但随着不断的深入,渐渐发现这种数据结构可以解决很多问题。例如,不同类型的数据如何联系在一起,如何根据这些联系进行数据挖掘等等。这些数据特征不一,在实际应用中会选用不同数据库进行管理。但是实际上这些数据在真实世界中往往有千丝万缕的关系。所以我着手研究“图”在其中的作用。
在我的工作内容中,需要处理不同类型的数据,例如:时序数据,三维图形数据,和业务对象等。从产品的角度出发,我需要将它们三者统一起来,进行存储,为数据挖掘和知识沉淀建立基础。
总体设计
平台架构
平台总体采用Spring Cloud的应用架构,主要选型:
spring cloud:2020.0.2
spring boot:2.4.3
mybatis:2.1.2
关于如何在idea创建Spring cloud项目和使用MyBatis自动生成代码可以分别参考以下两篇blogs
https://blog.csdn.net/sword_csdn/article/details/104961624
https://blog.csdn.net/sword_csdn/article/details/124607699
图平台需要促进不同类型数据的融合,并进行分析。所以整个平台的总体思路包含两个部分:数据汇聚和数据分析。
数据汇聚
要获得有意义的分析结果,则需要融合大量数据。如何对数据进行有效地组织和管理,是数据挖掘和分析的前提。所以在数据“入库”之前,我们可以提供一个定义对象的功能。使得数据入库之后以预先定义的规则来管理数据。
对象定义,schema_manager
“对象定义”模块概述
我们需要定义一个对象的抽象概念,所谓对象,往往对应真实世界中一个客观实体,不仅如此,对象还包含实体之间的关系。总而言之,对象包含客观世界中的“实体”,以及实体之间的“关系”。当平台具备了定义它们的能力时,理论上便已经具备了容纳所有数据的能力了。因为几乎所有的数据,都是围绕客观对象产生的。
具体而言,平台是给用户提供了一个自定义对象的机制。这里可以称为“对象定义”。例如:我们需要汇聚一份传感系统数据,我们就需要定义传感系统里都包含哪些类型的传感终端,这些终端是使用哪些属性来描述的。
此外,还有与终端相连接的“传感服务器A”(假设有),以及它们之间的电路连接,这些可以在对象定义中加以定义。
定义完成后,我们就可以在“对象管理”模块(后续会说明),按照定义来添加数据。
“对象定义”模块实现
目录树
用户可以基于目录树的方式管理对象定义。
目录树的功能可以基于Hutool的TreeUtils功能实现(详情可访问https://www.hutool.cn/),这里有关目录树的表结构创建对应的SQL语句可以参考。
DROP TABLE IF EXISTS "public"."schema_catalog";
CREATE TABLE "public"."schema_catalog" (
"id" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"token" varchar(255) COLLATE "pg_catalog"."default",
"name" varchar(255) COLLATE "pg_catalog"."default",
"parentid" varchar(255) COLLATE "pg_catalog"."default",
"level" int8,
"createtime" date,
"updatetime" date,
"pos" int8
);
COMMENT ON COLUMN "public"."schema_catalog"."id" IS '主键id';
COMMENT ON COLUMN "public"."schema_catalog"."token" IS '该节点所属标识,一般是角色';
COMMENT ON COLUMN "public"."schema_catalog"."name" IS '节点名称';
COMMENT ON COLUMN "public"."schema_catalog"."parentid" IS '父节点id';
COMMENT ON COLUMN "public"."schema_catalog"."level" IS '节点层级';
COMMENT ON COLUMN "public"."schema_catalog"."createtime" IS '创建时间';
COMMENT ON COLUMN "public"."schema_catalog"."updatetime" IS '更新时间';
COMMENT ON COLUMN "public"."schema_catalog"."pos" IS '排序';
-- ----------------------------
-- Primary Key structure for table schema_catalog
-- ----------------------------
ALTER TABLE "public"."schema_catalog" ADD CONSTRAINT "schema_catalog_pkey" PRIMARY KEY ("id");
相关新增,修改,删除等接口可按需实现。而值得记录的是关于级联删除的方法。我这里在对应schema_catalog数据表的MyBaties的Mapper XMl文件中自定义了该方法的SQL语句,如下所示:
<delete id="deleteAllChildren">
with ctl as (
delete
from schema_catalog
where id in (select c1.id
from schema_catalog c1
where c1.id = #{id}
and c1.token = #{token}
union all
select c2.id
from schema_catalog c2
inner join schema_catalog c1 on c1.id = c2.parentid) returning *),
obj as (
delete
from schema_object
where catalog_id in (select id from ctl) returning *)
delete
from schema_field
where object_id in (select id from obj)
</delete>
而关于级联查询的SQL语句如下:
<select id="selectAllChildren" resultType="com.southsmart.sgeocserver.schema.entity.SchemaCatalog">
with recursive ctl as (select c1.id, c1.name, c1.parentid, c1.level, c1.createtime, c1.updatetime, c1.pos
from schema_catalog c1
where c1.id = #{id}
and c1.token = #{token}
union all
select c2.id, c2.name, c2.parentid, c2.level, c2.createtime, c2.updatetime, c2.pos
from schema_catalog c2
inner join ctl c3 on c3.id = c2.parentid)
select *
from ctl
</select>
对象定义
对象定义的实现比较简单,有两个表,一个是对象表schema_object,一个是对象属性表schema_field,它们的表结构SQL分别是:
schema_object
-- ----------------------------
-- Table structure for schema_object
-- ----------------------------
DROP TABLE IF EXISTS "public"."schema_object";
CREATE TABLE "public"."schema_object" (
"id" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"catalog_id" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"name" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"name_cn" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"type" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"token" varchar(255) COLLATE "pg_catalog"."default",
"createtime" date,
"updatetime" date
)
;
COMMENT ON COLUMN "public"."schema_object"."id" IS '数据记录唯一标识';
COMMENT ON COLUMN "public"."schema_object"."catalog_id" IS '所属目录id';
COMMENT ON COLUMN "public"."schema_object"."name" IS '对象名称,用于创建schema';
COMMENT ON COLUMN "public"."schema_object"."name_cn" IS '对象中文名称';
COMMENT ON COLUMN "public"."schema_object"."type" IS '对象类型。0:实体对象,1:关系对象';
COMMENT ON COLUMN "public"."schema_object"."token" IS '权限设置';
COMMENT ON COLUMN "public"."schema_object"."createtime" IS '创建时间';
COMMENT ON COLUMN "public"."schema_object"."updatetime" IS '更新时间';
-- ----------------------------
-- Primary Key structure for table schema_object
-- ----------------------------
ALTER TABLE "public"."schema_object" ADD CONSTRAINT "schema_object_pkey" PRIMARY KEY ("id");
其中catalog_id属性对应schema_catalog表的id属性(这里不用外键)
schema_field
-- ----------------------------
-- Table structure for schema_field
-- ----------------------------
DROP TABLE IF EXISTS "public"."schema_field";
CREATE TABLE "public"."schema_field" (
"id" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"object_id" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"name" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"name_cn" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"type" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"len" varchar(255) COLLATE "pg_catalog"."default" NOT NULL,
"index_yn" bit(1)
)
;
COMMENT ON COLUMN "public"."schema_field"."id" IS '属性唯一标识';
COMMENT ON COLUMN "public"."schema_field"."object_id" IS '属性所属抽象对象';
COMMENT ON COLUMN "public"."schema_field"."name" IS '属性名称,用于创建schema';
COMMENT ON COLUMN "public"."schema_field"."name_cn" IS '属性中文名称';
COMMENT ON COLUMN "public"."schema_field"."type" IS '属性类型';
COMMENT ON COLUMN "public"."schema_field"."len" IS '属性值长度';
COMMENT ON COLUMN "public"."schema_field"."index_yn" IS '是否需要索引';
-- ----------------------------
-- Primary Key structure for table schema_field
-- ----------------------------
ALTER TABLE "public"."schema_field" ADD CONSTRAINT "schema_field_pkey" PRIMARY KEY ("id");
其中object_id对应schema_object中的id属性(这里不用外键)
“对象定义”与三维数据(图形数据)的关系
三维空间数据目前的组织和应用现状
由于本人也从事了三维数据的研究,所以在做数据治理的时候不得不思考三维数据(倾斜摄影,点云,BIM)如何融入到整个数据体系中。
目前三维数据的组织和应用总体可以基于上图来描述,三维数据的核心是三维网格(还有纹理,材质这些用于可视化层面的要素),一份三维数据经常以二进制方式同时存储着这些可视化要素和结构化属性信息。这种数据的组织方式存在一定的问题。
(1)结构化属性和三维可视化要素存储在一份数据中,导致数据过于庞大,不利于数据共享
(2)结构化属性之间是离散的,不利于数据分析,例如上图中的头部和手臂,它们在现实生活中是存在生理结构上的关系的。
优化后的数据组织和结构
实际上我们做数据治理的过程本质上都是在梳理数据与数据之间的关系。但是不管是怎样的数据,其本质都是围绕“实体”产生的,或是围绕实体产生的“事件”,或是“实体”产生的行为,又或是“实体”对外的表现。而我认为三维空间数据恰是实体的表象。
所以我们可以首先将结构化的属性数据和三维图形数据分开,在属性一侧使用“图”来组织。我们使用“对象定义”的功能预先定义好具体的对象,例如我们会先定义“头部”,“手臂”,“腿部”等等对象定义。然后根据定义好的属性进行赋值。
在将属性部分单独出来管理和存储之后,我们便可以仅就结构化的属性信息进行数据分析和挖掘。同时解决了数据量大时,访问数据缓慢的问题。
但是将三维图形与结构化属性分开之后,紧接着我们便要思考如何重新将属性和三维数据关联起来的问题。这里采用的做法是在“对象定义”时,新增一个特殊类型的的属性,这个类型表明该属性是存储了相关三维图形信息的“oblique_photography”。这类特殊的类型在选择后会允许在其“内部”继续定义属性,来具体表达这个特殊的类型。如下所示:
例如:如上图所示,我们可以在oblique_photography这个类型的内部又定义了两个属性,addr和geojson。addr表示3维图形数据的地址,geojson表示3维图形数据的局部区域。
关于“对象定义”与实时数据流
实时数据大多数由物联网设备产生,我会将实时数据的产生定位为某个具体物联设备的“行为”,不作为描述自身的属性。所以它跟三维空间数据一样,可以考虑存储在符合自身数据特点的存储介质中(例如时序数据库)便可。
关于“对象定义”中的特殊属性类型
“对象定义”模块包含特殊的属性类型,例如:三维倾斜摄影(OBLIQUE_PHOTOGRAPHY),实时数据流(REAL_TIME)等,这些数据类型会在一开始在数据表中定义好。
并在将要创建schema时,进行特殊处理(具体如何处理,下面会讲述)例如:
对象管理,object_manager
“对象管理”模块概述
对象管理,是根据前面对象定义好的数学模型创建,删除,查询,修改里面的实体,以及实体关系。
“对象管理”模块的基本实现
数据空间,dataspace
dataspace原本是Nebula Graph中的概念,这个概念在图平台中可以用于区分不同的业务域,例如在“对象定义”阶段,我们可以定义“人”,“车”,“房子”三种定义,并能在不同的场景选择使用不同的对象定义。
nebula graph中的基本属性 schema 创建
在确定数据空间(业务场景),以及所对应的对象定义之后,便可以创建schema。
我们根据前面的对象定义,拼接出所需要tag,和edge的ngql语句。
public int create_graph_schema(String space_name, List<SchemaObjectVO> objects) {
Session session;
List<String> ngqls = new ArrayList<>();
ngqls.add(String.format("USE `%s`", space_name));
List<String> tag_ngql = new ArrayList<>();
List<String> edge_ngql = new ArrayList<>();
List<String> index_ngql = new ArrayList<>();
objects.forEach(item -> {
String objectName = item.getName().trim().replaceAll(" ", "_");
List<SchemaFieldVO> fields = item.getFields();
if (item.getType().equals("0")) {//如果是tag
index_ngql.add(String.format("CREATE TAG INDEX IF NOT EXISTS %s_index ON %s()", objectName, objectName));
List<String> tag_name_types = new ArrayList<>();
fields.forEach(item2 -> {
String fieldName = item2.getName().trim().replaceAll(" ", "_");
String fieldType = TypeConvertor.convertoToNebulaType(item2);
tag_name_types.add(String.format("%s %s", fieldName, fieldType));
if (item2.getIndexYn()) {
index_ngql.add(String.format("CREATE TAG INDEX IF NOT EXISTS %s_%s_index on %s(%s)", objectName, fieldName, objectName, fieldName));
}
});
tag_ngql.add(String.format("CREATE TAG IF NOT EXISTS %s (%s)", objectName, String.join(",", tag_name_types)));
} else if (item.getType().equals("1")) {//如果是edge
index_ngql.add(String.format("CREATE EDGE INDEX IF NOT EXISTS %s_index ON %s()", objectName, objectName));
List<String> edge_name_types = new ArrayList<>();
fields.forEach(item2 -> {
String fieldName = item2.getName().trim().replaceAll(" ", "_");
String fieldType = TypeConvertor.convertoToNebulaType(item2);
if (!(fieldName.toLowerCase().equals("source") || fieldName.toLowerCase().equals("target"))) {
edge_name_types.add(String.format("%s %s", fieldName, fieldType));
if (item2.getIndexYn()) {
index_ngql.add(String.format("CREATE EDGE INDEX IF NOT EXISTS %s_%s_index on %s(%s)", objectName, fieldName, objectName, fieldName));
}
}
});
edge_ngql.add(String.format("CREATE EDGE IF NOT EXISTS %s (%s)", objectName, String.join(",", edge_name_types)));
}
});
ngqls.add(String.join(";", tag_ngql));
ngqls.add(String.join(";", edge_ngql));
ngqls.add(String.join(";", index_ngql));
String ngql = String.join(";", ngqls) + ";";
log.info(String.format("Execute:%s", ngql));
try {
session = pool.getSession(username, password, false);
ResultSet resp = session.execute(ngql);
if (!resp.isSucceeded()) {
log.error(String.format("Execute failed,message:%s", ngql, resp.getErrorMessage()));
return -1;
}
session.release();
return 1;
} catch (Exception e) {
e.printStackTrace();
return -1;
}
}
这里关于Nebula Graph的连接Session,是通过注入Nebula-java客户端的NebulaPool得到。注入NebulaPool的代码如下:
package com.southsmart.sgeocserver.graph.configuration;
import com.vesoft.nebula.client.graph.NebulaPoolConfig;
import com.vesoft.nebula.client.graph.data.HostAddress;
import com.vesoft.nebula.client.graph.net.NebulaPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.ArrayList;
import java.util.List;
@Configuration
public class NebulaPoolConfiguration {
private static final Logger log = LoggerFactory.getLogger(NebulaPoolConfiguration.class);
@Value("${spring.cloud.nebula.hosts-addr}")
private String addr;
@Value("${spring.cloud.nebula.max-connetions}")
private String maxConns;
@Bean
NebulaPool getNebulaPool(){
NebulaPool pool = new NebulaPool();
log.info(String.format("Initializing nebula pool:addresses=%s,max connections=%s",addr,maxConns));
try{
NebulaPoolConfig config = new NebulaPoolConfig();
List<HostAddress> hostAddressList = new ArrayList<>();
if(maxConns!=null&&!maxConns.equals("")){
config.setMaxConnSize(Integer.parseInt(maxConns));
}
if(addr!=null&&!addr.equals("")){
String[] addrs = addr.split(",");
for(int i=0;i<addrs.length;i++){
String[]ipports = addrs[i].split(":");
hostAddressList.add(new HostAddress(ipports[0],Integer.parseInt(ipports[1])));
}
}
Boolean initResult = pool.init(hostAddressList,config);
if(!initResult){
log.error("Nebula pool init failed");
}
}catch (Exception e){
e.printStackTrace();
}
return pool;
}
}
创建完Schema后,可以使用Nebula Studio查看相关的记录。
实体 schema
实体关系 schema
查询索引 schema
新建实体
对象定义的工作完成之后,便可以根据定义进行赋值。每一次赋值需要选择在“对象定义”模块创建好的定义。先预先定义好从前端传过来的数据结构如下:
public class ObjectVO {
String dataspace;//所以使用的数据空间
String name;//对象定义名称
String type;//需要赋值的对象定义
List<FieldsVO> fields;
}
public class FieldsVO {
String name;//属性名称
String value;//属性值
String type;//属性定义类型
List<FieldsVO> children;//特殊属性类型下的属性定义
}
解析上面数据结构成ngql语句并执行。
public int create_graph_object(String space_name, ObjectVO objectVO) {
Session session;
List<String> ngqls = new ArrayList<>();
ngqls.add(String.format("USE %s", space_name));
List<String> vert_ngql = new ArrayList<>();
List<String> edge_ngql = new ArrayList<>();
String objectName = objectVO.getName().trim().replaceAll(" ", "_");
List<FieldsVO> fieldsVOS = objectVO.getFields();
if (objectVO.getType().equals("0")) {
List<String> tag_name_type = new ArrayList<>();
List<String> tag_value = new ArrayList<>();
String vid = "";
for (int i = 0; i < fieldsVOS.size(); i++) {
String fieldName = fieldsVOS.get(i).getName().trim().replaceAll(" ", "_");
if (fieldName.equals("id")) {
vid = fieldsVOS.get(i).getValue();
break;
}
}
fieldsVOS.forEach(item -> {
String fieldName = item.getName().trim().replaceAll(" ", "_");
String fieldType = item.getType();
tag_name_type.add(item.getName());
tag_value.add(TypeConvertor.convertToNebulaValueFormat(item.getType(), item.getValue()));
});
vert_ngql.add(String.format("INSERT VERTEX IF NOT EXISTS %s(%s) VALUES \"%s\"(%s)", objectName, String.join(",", tag_name_type), vid, String.join(",", tag_value)));
}
if (objectVO.getType().equals("1")) {
List<String> edge_name_type = new ArrayList<>();
List<String> edge_value = new ArrayList<>();
String svid = "";
String tvid = "";
int count = 0;
for (int i = 0; i < fieldsVOS.size(); i++) {
String fieldName = fieldsVOS.get(i).getName().trim().replaceAll(" ", "_");
if (fieldName.equals("source")) {
svid = fieldsVOS.get(i).getValue();
}
if (fieldName.equals("target")) {
tvid = fieldsVOS.get(i).getValue();
}
if (count == 2) {
break;
}
}
fieldsVOS.forEach(item -> {
String fieldName = item.getName().trim().replaceAll(" ", "_");
String fieldType = item.getType();
edge_name_type.add(item.getName());
edge_value.add(TypeConvertor.convertToNebulaValueFormat(item.getType(),item.getValue()));
});
edge_ngql.add(String.format("INSERT EDGE IF NOT EXISTS %s(%s) VALUES \"%s\"->\"%s\"(%s)",objectName,String.join(",",edge_name_type),svid,tvid,String.join(",",edge_value)));
}
ngqls.add(String.format(",",vert_ngql));
ngqls.add(String.format(",",edge_ngql));
String ngql = String.format(";",ngqls)+";";
log.info(String.format("Execute:%s", ngql));
try {
session = pool.getSession(username, password, false);
ResultSet resp = session.execute(ngql);
if (!resp.isSucceeded()) {
log.error(String.format("Execute failed,message:%s", ngql, resp.getErrorMessage()));
return -1;
}
session.release();
return 1;
} catch (Exception e) {
e.printStackTrace();
return -1;
}
}
特殊属性类型的处理方法
依照前面所述,一般情况下,特殊属性类型的定义虽然都是在对象定义模块进行,但是真正底层对应的schema不会在nebula 图数据库创建,而是根据类型的不同选择不同的数据库。
三维数据
三维数据本质上是一个个文件,并且单独的文件并不代表一个实体。换言之,我们无法单纯从一份三维数据中直接“裁剪”出得到我们想要的区域(例如我需要从一份广东的倾斜数据中分离出广州的部分)。这时我们一般的处理方法通过结合geojson的方式进行识别区域性的识别。同理,我们在对一个特殊属性A进行定义时,会在其中额外定义两个基本属性B和C,来对A这一特殊属性进行描述。一般情况下只需要三、四个属性即可:实体唯一标识(id)、三维数据地址(service_url或source_url,source_url用于三维数据的局部更新)和标识实体区域的局部geojson。
geojson数据一般使用postgresql来存储,如果没有意外,所有不同类型实体的,关于三维特殊属性都可以存在一个表(三维图形表)中(因为两个属性是固定的)。所以在使用geojson进行查询时,可以迅速找到范围内实体的所有id,然后根据id,查询图数据库的vid,找到实体。
实时数据
实时数据使用时序数据库(例如,TDEngine)存储,所以相关schema也会在TDEngine中创建,这种对特殊属性类型的处理方式跟三维特殊类型的方式类似。不一样的是,实时数据需要自定义接收程序,然后将数据实时更新到TDEngine中。并且这种程序理论上不会结束,像基于Spark Streaming写的Spark程序。