一、添加字段:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time :2022/2/17 10:03
# @Author :weiz
# @ProjectName :coco_evaluate
# @File :add_key_word.py
# @Description :
import json
import glob
import os
def add_key(annotation_path, dest_path):
"""
coco数据集添加缺损字段
:param annotation_path:
:param dest_path:
:return:
"""
with open(annotation_path, 'r') as load_f:
s_json = json.load(load_f)
# print(s_json["annotations"])
for i, value in enumerate(s_json["annotations"]):
s_json["annotations"][i]["iscrowd"] = 0
print(s_json["annotations"][i])
with open(dest_path, "w") as dump_f:
json.dump(s_json, dump_f)
def check_set(annotation_path, images_path):
"""
检测coco数据集的标注文件和图片是否一致
:param annotation_path:
:param images_path:
:return:
"""
with open(annotation_path, 'r') as load_f:
s_json = json.load(load_f)
annotation_image_list = []
for images_value in s_json["images"]:
# print(images_value)
annotation_image_list.append(images_value["file_name"])
image_path_list = glob.glob(os.path.join(images_path, "*.png"))
image_list = []
for image_path in image_path_list:
image_list.append(os.path.basename(image_path))
print(len(annotation_image_list))
print(len(annotation_image_list))
print(set(annotation_image_list) - set(image_list))
print(set(image_list) - set(annotation_image_list))
annotation_path = "./good/annotations.json"
dest_path = "./good/annotations_add.json"
images_path = "./good"
if __name__ == "__main__":
add_key(annotation_path, dest_path)
# check_set(dest_path, images_path)
二、合并数据集
只能合并不同类别数据:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time :2024/4/11 13:37
# @Author :weiz
# @ProjectName :jiangling_new
# @File :coco_merge.py
# @Description :
import os
import json
import random
def load_json(filenamejson):
"""
加载json数据
:param filenamejson:
:return:
"""
with open(filenamejson) as f:
raw_data = json.load(f)
return raw_data
def categories_add(categories_all, categories_tmp):
"""
类别合并
:param categories_all:
:param categories_tmp:
:return:
"""
all_num = 0
for categories in categories_all:
if categories["id"] > all_num:
all_num = categories["id"]
categories_id_old = []
categories_id_new = []
for categories in categories_tmp:
all_num = all_num + 1
categories_id_old.append(categories["id"])
categories_id_new.append(all_num)
categories["id"] = all_num
categories["color"] = [random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)]
categories_all.append(categories)
print("类别合并完成!")
return categories_all, categories_id_old, categories_id_new
def images_add(images_all, images_tmp):
"""
图片合并
:param images_all:
:param images_tmp:
:return:
"""
all_num = 0
for image in images_all:
if image["id"] > all_num:
all_num = image["id"]
images_id_old = []
images_id_new = []
for image in images_tmp:
all_num = all_num + 1
images_id_old.append(image["id"])
images_id_new.append(all_num)
image["id"] = all_num
images_all.append(image)
print("图片合并完成!")
return images_all, images_id_old, images_id_new
def annotations_add(annotations_all, annotations_tmp, categories_id_old, categories_id_new, images_id_old, images_id_new):
"""
标注信息合并
:param annotations_all:
:param annotations_tmp:
:param categories_id_old:
:param categories_id_new:
:param images_id_old:
:param images_id_new:
:return:
"""
all_num = 0
for annotation in annotations_all:
if annotation["id"] > all_num:
all_num = annotation["id"]
for annotation in annotations_tmp:
all_num = all_num + 1
annotation["id"] = all_num
for ind, val in enumerate(categories_id_old):
if annotation["category_id"] == val:
annotation["category_id"] = categories_id_new[ind]
for ind, val in enumerate(images_id_old):
if annotation["image_id"] == val:
annotation["image_id"] = images_id_new[ind]
annotations_all.append(annotation)
print("标注信息合并完成!")
return annotations_all
def merge_coco_json(file_path):
"""
coco数据集的分割数据合并
:param file_path:
:return:
"""
fileNameList = os.listdir(file_path)
print(fileNameList)
categories_all = []
images_all = []
annotations_all = []
info_all = "merage coco of weiz"
licenses = ""
for i, fileName in enumerate(fileNameList):
if fileName == g_saveFileName:
print("忽略json名为{}的文件".format(g_saveFileName))
continue
root_data = load_json(os.path.join(file_path, fileName))
if i == 0:
categories_all = root_data["categories"]
images_all = root_data["images"]
annotations_all = root_data["annotations"]
#print(len(categories_all))
#print(len(images_all))
#print(len(annotations_all))
else:
root_data = load_json(os.path.join(file_path, fileName))
categories_tmp = root_data["categories"]
images_tmp = root_data["images"]
annotations_tmp = root_data["annotations"]
categories_all, categories_id_old, categories_id_new = categories_add(categories_all, categories_tmp)
# print(categories_id_old)
# print(categories_id_new)
# for categories in categories_all:
# print(categories)
images_all, images_id_old, images_id_new = images_add(images_all, images_tmp)
# print(images_id_old)
# print(images_id_new)
# for image in images_all:
# print(image)
annotations_all = annotations_add(annotations_all, annotations_tmp, categories_id_old, categories_id_new, images_id_old,
images_id_new)
# for annotation in annotations_all:
# print(annotation)
root_json = {}
root_json["categories"] = categories_all
root_json["images"] = images_all
root_json["annotations"] = annotations_all
root_json["info"] = info_all
root_json["licenses"] = licenses
json_str = json.dumps(root_json)
saveJsonPath = os.path.join(file_path, g_saveFileName)
with open(saveJsonPath, 'w') as json_file:
json_file.write(json_str)
print("类别共有{}类!".format(len(categories_all)))
print("图片共有{}张!".format(len(images_all)))
print("标注共有{}个!".format(len(annotations_all)))
g_saveFileName = "merge.json"
file_path = "C:/Users/weiz/Desktop/handrail_all/" # 待合并的路径
if __name__ == "__main__":
merge_coco_json(file_path)
即能合并不同类别又能合并相同类别分割数据。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time :2024/9/23 10:54
# @Author :weiz
# @ProjectName :weizTools
# @File :cocoSegmentMerge.py
# @Description :coco格式的segment数据合并
import os
import json
import random
def load_json(filenamejson):
"""
加载json数据
:param filenamejson:
:return:
"""
with open(filenamejson) as f:
raw_data = json.load(f)
return raw_data
def categories_add(categories_all, categories_tmp):
"""
类别合并
:param categories_all:
:param categories_tmp:
:return:
"""
all_num = 0
category_name_list = []
category_name_id_list = []
for categories in categories_all:
category_name_list.append(categories["name"])
category_name_id_list.append(categories["id"])
if categories["id"] > all_num:
all_num = categories["id"]
categories_id_old = []
categories_id_new = []
for categories in categories_tmp:
is_add = False
for ind, category_name in enumerate(category_name_list):
if categories["name"] == category_name:
categories_id_old.append(categories["id"])
categories_id_new.append(category_name_id_list[ind])
is_add = True
if not is_add:
all_num = all_num + 1
categories_id_old.append(categories["id"])
categories_id_new.append(all_num)
categories["id"] = all_num
categories["color"] = [random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)]
categories_all.append(categories)
print("类别合并完成!")
return categories_all, categories_id_old, categories_id_new
def images_add(images_all, images_tmp):
"""
图片合并
:param images_all:
:param images_tmp:
:return:
"""
all_num = 0
for image in images_all:
if image["id"] > all_num:
all_num = image["id"]
images_id_old = []
images_id_new = []
for image in images_tmp:
all_num = all_num + 1
images_id_old.append(image["id"])
images_id_new.append(all_num)
image["id"] = all_num
images_all.append(image)
print("图片合并完成!")
return images_all, images_id_old, images_id_new
def annotations_add(annotations_all, annotations_tmp, categories_id_old, categories_id_new, images_id_old,
images_id_new):
"""
标注信息合并
:param annotations_all:
:param annotations_tmp:
:param categories_id_old:
:param categories_id_new:
:param images_id_old:
:param images_id_new:
:return:
"""
all_num = 0
for annotation in annotations_all:
if annotation["id"] > all_num:
all_num = annotation["id"]
for annotation in annotations_tmp:
all_num = all_num + 1
annotation["id"] = all_num
for ind, val in enumerate(categories_id_old):
if annotation["category_id"] == val:
annotation["category_id"] = categories_id_new[ind]
for ind, val in enumerate(images_id_old):
if annotation["image_id"] == val:
annotation["image_id"] = images_id_new[ind]
annotations_all.append(annotation)
print("标注信息合并完成!")
return annotations_all
def merge_coco_json(file_path):
"""
coco数据集的分割数据合并
:param file_path:
:return:
"""
fileNameList = os.listdir(file_path)
print(fileNameList)
categories_all = []
images_all = []
annotations_all = []
info_all = "merage coco of weiz"
licenses = ""
for i, fileName in enumerate(fileNameList):
if fileName == g_saveFileName:
print("忽略json名为{}的文件".format(g_saveFileName))
continue
root_data = load_json(os.path.join(file_path, fileName))
if i == 0:
categories_all = root_data["categories"]
images_all = root_data["images"]
annotations_all = root_data["annotations"]
# print(len(categories_all))
# print(len(images_all))
# print(len(annotations_all))
else:
root_data = load_json(os.path.join(file_path, fileName))
categories_tmp = root_data["categories"]
images_tmp = root_data["images"]
annotations_tmp = root_data["annotations"]
categories_all, categories_id_old, categories_id_new = categories_add(categories_all, categories_tmp)
# print(categories_id_old)
# print(categories_id_new)
# for categories in categories_all:
# print(categories)
images_all, images_id_old, images_id_new = images_add(images_all, images_tmp)
# print(images_id_old)
# print(images_id_new)
# for image in images_all:
# print(image)
annotations_all = annotations_add(annotations_all, annotations_tmp, categories_id_old, categories_id_new,
images_id_old,
images_id_new)
# for annotation in annotations_all:
# print(annotation)
root_json = {}
root_json["categories"] = categories_all
root_json["images"] = images_all
root_json["annotations"] = annotations_all
root_json["info"] = info_all
root_json["licenses"] = licenses
json_str = json.dumps(root_json)
saveJsonPath = os.path.join(file_path, g_saveFileName)
with open(saveJsonPath, 'w') as json_file:
json_file.write(json_str)
print("类别共有{}类!{}".format(len(categories_all), categories_all))
print("图片共有{}张!".format(len(images_all)))
print("标注共有{}个!".format(len(annotations_all)))
g_saveFileName = "merge.json"
file_path = r"C:\Users\Administrator\Desktop\chaoyi_labeling\merge" # 待合并的路径
if __name__ == "__main__":
merge_coco_json(file_path)
三、过滤小分割区域
import os
import json
def filter_min_area(jsonPath):
"""
每张图片只存在一个分割目标,过滤那些一张图片存在多个分割区域的小目标
:param jsonPath:
:return:
"""
with open(jsonPath, 'r') as load_f:
json_data = json.load(load_f)
categories = json_data["categories"]
images = json_data["images"]
annotations = json_data["annotations"]
info = json_data["info"]
licenses = json_data["licenses"]
print("categories:", categories)
print("images number:", len(images))
print("annotations number:", len(annotations))
print("info:", info)
print("licenses:", licenses)
copy_annotations = []
count_number = 1
for i, annotation_i in enumerate(annotations):
image_id_i = annotation_i["image_id"]
is_max_area = True
for annotation_j in annotations:
image_id_j = annotation_j["image_id"]
if image_id_i == image_id_j:
if annotation_j["area"] > annotation_i["area"]:
is_max_area = False
print("一个图片存在多个标注文件:", annotation_i)
if is_max_area:
annotation_i["id"] = count_number
copy_annotations.append(annotation_i)
count_number = count_number + 1
print(len(copy_annotations))
root_json = {}
root_json["categories"] = categories
root_json["images"] = images
root_json["annotations"] = copy_annotations
root_json["info"] = info
root_json["licenses"] = licenses
json_str = json.dumps(root_json)
json_new_name = os.path.splitext(jsonPath)[0] + "_new.json"
with open(json_new_name, 'w') as json_file:
json_file.write(json_str)
return json_str
g_setNames = ["train.json", "val.json", "test.json"]
g_ratio = [0.8, 0.2, 0] # 训练集 验证集 测试集
g_segmentPath = "C:/Users/weiz/Desktop/handrail_segment/annotations.json"
if __name__ == "__main__":
filter_min_area(g_segmentPath)
四、CVAT标注分割数据转COCO分割数据格式
import json
import cv2
import numpy as np
from pycocotools import mask
from pycocotools.coco import COCO
def is_clockwise(contour):
value = 0
num = len(contour)
for i, point in enumerate(contour):
p1 = contour[i]
if i < num - 1:
p2 = contour[i + 1]
else:
p2 = contour[0]
value += (p2[0][0] - p1[0][0]) * (p2[0][1] + p1[0][1]);
return value < 0
def get_merge_point_idx(contour1, contour2):
idx1 = 0
idx2 = 0
distance_min = -1
for i, p1 in enumerate(contour1):
for j, p2 in enumerate(contour2):
distance = pow(p2[0][0] - p1[0][0], 2) + pow(p2[0][1] - p1[0][1], 2);
if distance_min < 0:
distance_min = distance
idx1 = i
idx2 = j
elif distance < distance_min:
distance_min = distance
idx1 = i
idx2 = j
return idx1, idx2
def merge_contours(contour1, contour2, idx1, idx2):
contour = []
for i in list(range(0, idx1 + 1)):
contour.append(contour1[i])
for i in list(range(idx2, len(contour2))):
contour.append(contour2[i])
for i in list(range(0, idx2 + 1)):
contour.append(contour2[i])
for i in list(range(idx1, len(contour1))):
contour.append(contour1[i])
contour = np.array(contour)
return contour
def merge_with_parent(contour_parent, contour):
if not is_clockwise(contour_parent):
contour_parent = contour_parent[::-1]
if is_clockwise(contour):
contour = contour[::-1]
idx1, idx2 = get_merge_point_idx(contour_parent, contour)
return merge_contours(contour_parent, contour, idx1, idx2)
def mask2polygon(image):
contours, hierarchies = cv2.findContours(image, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_TC89_KCOS)
contours_approx = []
polygons = []
for contour in contours:
epsilon = 0.001 * cv2.arcLength(contour, True)
contour_approx = cv2.approxPolyDP(contour, epsilon, True)
contours_approx.append(contour_approx)
contours_parent = []
for i, contour in enumerate(contours_approx):
parent_idx = hierarchies[0][i][3]
if parent_idx < 0 and len(contour) >= 3:
contours_parent.append(contour)
else:
contours_parent.append([])
for i, contour in enumerate(contours_approx):
parent_idx = hierarchies[0][i][3]
if parent_idx >= 0 and len(contour) >= 3:
contour_parent = contours_parent[parent_idx]
if len(contour_parent) == 0:
continue
contours_parent[parent_idx] = merge_with_parent(contour_parent, contour)
contours_parent_tmp = []
for contour in contours_parent:
if len(contour) == 0:
continue
contours_parent_tmp.append(contour)
polygons = []
for contour in contours_parent_tmp:
polygon = contour.flatten().tolist()
polygons.append(polygon)
return polygons
if __name__ == '__main__':
# 指定cvat导出的标注文件的路径
annotations_file = 'D:/user/ljpDesktop/instances_Train.json'
# 初始化COCO对象
coco = COCO(annotations_file)
# 获取所有的图像ID
image_ids = coco.getImgIds()
# 遍历每一个图像ID
for img_id in image_ids:
# 获取该图像的所有标注信息
annotations_ids = coco.getAnnIds(imgIds=img_id)
annotation = coco.loadAnns(annotations_ids)
segmentation = annotation[0]['segmentation']
if isinstance(segmentation["counts"], list):
segmentation = mask.frPyObjects(segmentation, *segmentation["size"])
m = mask.decode(segmentation)
m[m > 0] = 255
polygons = mask2polygon(m)
annotation[0]['segmentation'] = polygons
#print('annotation[0][segmentation]:', annotation[0]['segmentation'])
annotation[0]['iscrowd'] = 0
#修改后保存的路径
new_file_path = 'modified_annotations.json'
with open(new_file_path, 'w') as f:
json.dump(coco.dataset, f, indent=4)
print(f"修改后的数据已保存到 {new_file_path}")
五、coco分割数据集划分
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time :2024/4/11 13:37
# @Author :weiz
# @ProjectName :jiangling_new
# @File :coco_merge.py
# @Description :
import os
import json
import random
def load_json(filenamejson):
"""
加载json数据
:param filenamejson:
:return:
"""
with open(filenamejson) as f:
raw_data = json.load(f)
return raw_data
def dataset_partitioning(jsonPath, trainSetRatio=0.8, valSetRatio=0.1, testSetRatio=0.1):
"""
coco分割数据集划分
:param jsonPath:
:param trainSetRatio:
:param valSetRatio:
:param testSetRatio:
:return:
"""
if (trainSetRatio + valSetRatio + testSetRatio) != 1.0:
print("trainSet + valSet + testSet != 1")
return
savePath = os.path.dirname(jsonPath)
root_data = load_json(jsonPath)
categories_all = root_data["categories"]
images_all = root_data["images"]
annotations_all = root_data["annotations"]
info_all = root_data["info"]
licenses_all = root_data["licenses"]
images_id_all = []
for image in images_all:
images_id_all.append(image["id"])
random.shuffle(images_id_all)
test_index = int(len(images_id_all) * testSetRatio)
val_index = int(len(images_id_all) * valSetRatio)
testImagesIDSet = images_id_all[:test_index]
valImagesIDSet = images_id_all[test_index:test_index+val_index]
trainImagesIDSet = list(set(images_id_all) - set(testImagesIDSet) - set(valImagesIDSet))
images_test = []
images_val = []
images_train = []
for image in images_all:
if image["id"] in trainImagesIDSet:
images_train.append(image)
elif image["id"] in valImagesIDSet:
images_val.append(image)
else:
images_test.append(image["id"])
annotations_test = []
annotations_val = []
annotations_train = []
for annotation in annotations_all:
if annotation["image_id"] in trainImagesIDSet:
annotations_train.append(annotation)
elif annotation["image_id"] in valImagesIDSet:
annotations_val.append(annotation)
else:
annotations_test.append(annotation)
print("总数据图片{}张,总标注文件{}个!".format(len(images_all), len(annotations_all)))
print("训练集图片{}张,标注文件{}个!".format(len(images_train), len(annotations_train)))
print("验证集集图片{}张,标注文件{}个!".format(len(images_val), len(annotations_val)))
print("测试集图片{}张,标注文件{}个!".format(len(images_test), len(annotations_test)))
train_root = {}
train_root["categories"] = categories_all
train_root["images"] = images_train
train_root["annotations"] = annotations_train
train_root["info"] = info_all
train_root["licenses"] = licenses_all
json_str = json.dumps(train_root)
saveJsonPath = os.path.join(savePath, "train.json")
with open(saveJsonPath, 'w') as json_file:
json_file.write(json_str)
val_root = {}
val_root["categories"] = categories_all
val_root["images"] = images_val
val_root["annotations"] = annotations_val
val_root["info"] = info_all
val_root["licenses"] = licenses_all
json_str = json.dumps(val_root)
saveJsonPath = os.path.join(savePath, "val.json")
with open(saveJsonPath, 'w') as json_file:
json_file.write(json_str)
test_root = {}
test_root["categories"] = categories_all
test_root["images"] = images_test
test_root["annotations"] = annotations_test
test_root["info"] = info_all
test_root["licenses"] = licenses_all
json_str = json.dumps(test_root)
saveJsonPath = os.path.join(savePath, "test.json")
with open(saveJsonPath, 'w') as json_file:
json_file.write(json_str)
g_annotationsPath = "C:/Users/weiz/Desktop/handrail_all/merge.json"
if __name__ == "__main__":
dataset_partitioning(g_annotationsPath, 0.9, 0.1, 0.0)