coco数据集添加字段、合并数据集及过滤小分割区域

一、添加字段:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time        :2022/2/17 10:03
# @Author      :weiz
# @ProjectName :coco_evaluate
# @File        :add_key_word.py
# @Description :
import json
import glob
import os


def add_key(annotation_path, dest_path):
    """
    coco数据集添加缺损字段
    :param annotation_path:
    :param dest_path:
    :return:
    """
    with open(annotation_path, 'r') as load_f:
        s_json = json.load(load_f)

    # print(s_json["annotations"])


    for i, value in enumerate(s_json["annotations"]):
        s_json["annotations"][i]["iscrowd"] = 0
        print(s_json["annotations"][i])

    with open(dest_path, "w") as dump_f:
        json.dump(s_json, dump_f)


def check_set(annotation_path, images_path):
    """
    检测coco数据集的标注文件和图片是否一致
    :param annotation_path:
    :param images_path:
    :return:
    """
    with open(annotation_path, 'r') as load_f:
        s_json = json.load(load_f)

    annotation_image_list = []
    for images_value in s_json["images"]:
        # print(images_value)
        annotation_image_list.append(images_value["file_name"])

    image_path_list = glob.glob(os.path.join(images_path, "*.png"))
    image_list = []
    for image_path in image_path_list:
        image_list.append(os.path.basename(image_path))

    print(len(annotation_image_list))
    print(len(annotation_image_list))
    print(set(annotation_image_list) - set(image_list))
    print(set(image_list) - set(annotation_image_list))


annotation_path = "./good/annotations.json"
dest_path = "./good/annotations_add.json"
images_path = "./good"
if __name__ == "__main__":
    add_key(annotation_path, dest_path)
    # check_set(dest_path, images_path)

二、合并数据集

      只能合并不同类别数据:

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time        :2024/4/11 13:37
# @Author      :weiz
# @ProjectName :jiangling_new
# @File        :coco_merge.py
# @Description :
import os
import json
import random


def load_json(filenamejson):
    """
    加载json数据
    :param filenamejson:
    :return:
    """
    with open(filenamejson) as f:
        raw_data = json.load(f)
    return raw_data


def categories_add(categories_all, categories_tmp):
    """
    类别合并
    :param categories_all:
    :param categories_tmp:
    :return:
    """
    all_num = 0
    for categories in categories_all:
        if categories["id"] > all_num:
            all_num = categories["id"]

    categories_id_old = []
    categories_id_new = []

    for categories in categories_tmp:
        all_num = all_num + 1
        categories_id_old.append(categories["id"])
        categories_id_new.append(all_num)
        categories["id"] = all_num
        categories["color"] = [random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)]
        categories_all.append(categories)

    print("类别合并完成!")

    return categories_all, categories_id_old, categories_id_new


def images_add(images_all, images_tmp):
    """
    图片合并
    :param images_all:
    :param images_tmp:
    :return:
    """
    all_num = 0
    for image in images_all:
        if image["id"] > all_num:
            all_num = image["id"]

    images_id_old = []
    images_id_new = []

    for image in images_tmp:
        all_num = all_num + 1
        images_id_old.append(image["id"])
        images_id_new.append(all_num)
        image["id"] = all_num

        images_all.append(image)

    print("图片合并完成!")

    return images_all, images_id_old, images_id_new


def annotations_add(annotations_all, annotations_tmp, categories_id_old, categories_id_new, images_id_old, images_id_new):
    """
    标注信息合并
    :param annotations_all:
    :param annotations_tmp:
    :param categories_id_old:
    :param categories_id_new:
    :param images_id_old:
    :param images_id_new:
    :return:
    """
    all_num = 0
    for annotation in annotations_all:
        if annotation["id"] > all_num:
            all_num = annotation["id"]

    for annotation in annotations_tmp:
        all_num = all_num + 1
        annotation["id"] = all_num

        for ind, val in enumerate(categories_id_old):
            if annotation["category_id"] == val:
                annotation["category_id"] = categories_id_new[ind]

        for ind, val in enumerate(images_id_old):
            if annotation["image_id"] == val:
                annotation["image_id"] = images_id_new[ind]

        annotations_all.append(annotation)

    print("标注信息合并完成!")

    return annotations_all


def merge_coco_json(file_path):
    """
    coco数据集的分割数据合并
    :param file_path:
    :return:
    """
    fileNameList = os.listdir(file_path)
    print(fileNameList)

    categories_all = []
    images_all = []
    annotations_all = []
    info_all = "merage coco of weiz"
    licenses = ""
    for i, fileName in enumerate(fileNameList):
        if fileName == g_saveFileName:
            print("忽略json名为{}的文件".format(g_saveFileName))
            continue

        root_data = load_json(os.path.join(file_path, fileName))

        if i == 0:
            categories_all = root_data["categories"]
            images_all = root_data["images"]
            annotations_all = root_data["annotations"]
            #print(len(categories_all))
            #print(len(images_all))
            #print(len(annotations_all))
        else:
            root_data = load_json(os.path.join(file_path, fileName))

            categories_tmp = root_data["categories"]
            images_tmp = root_data["images"]
            annotations_tmp = root_data["annotations"]

            categories_all, categories_id_old, categories_id_new = categories_add(categories_all, categories_tmp)
            # print(categories_id_old)
            # print(categories_id_new)
            # for categories in categories_all:
            #     print(categories)

            images_all, images_id_old, images_id_new = images_add(images_all, images_tmp)
            # print(images_id_old)
            # print(images_id_new)
            # for image in images_all:
            #     print(image)

            annotations_all = annotations_add(annotations_all, annotations_tmp, categories_id_old, categories_id_new, images_id_old,
                            images_id_new)
            # for annotation in annotations_all:
            #     print(annotation)

    root_json = {}
    root_json["categories"] = categories_all
    root_json["images"] = images_all
    root_json["annotations"] = annotations_all
    root_json["info"] = info_all
    root_json["licenses"] = licenses

    json_str = json.dumps(root_json)
    saveJsonPath = os.path.join(file_path, g_saveFileName)
    with open(saveJsonPath, 'w') as json_file:
        json_file.write(json_str)

    print("类别共有{}类!".format(len(categories_all)))
    print("图片共有{}张!".format(len(images_all)))
    print("标注共有{}个!".format(len(annotations_all)))


g_saveFileName = "merge.json"
file_path = "C:/Users/weiz/Desktop/handrail_all/"  # 待合并的路径
if __name__ == "__main__":
    merge_coco_json(file_path)

 即能合并不同类别又能合并相同类别分割数据。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time        :2024/9/23 10:54
# @Author      :weiz
# @ProjectName :weizTools
# @File        :cocoSegmentMerge.py
# @Description :coco格式的segment数据合并
import os
import json
import random


def load_json(filenamejson):
    """
    加载json数据
    :param filenamejson:
    :return:
    """
    with open(filenamejson) as f:
        raw_data = json.load(f)
    return raw_data


def categories_add(categories_all, categories_tmp):
    """
    类别合并
    :param categories_all:
    :param categories_tmp:
    :return:
    """
    all_num = 0
    category_name_list = []
    category_name_id_list = []
    for categories in categories_all:
        category_name_list.append(categories["name"])
        category_name_id_list.append(categories["id"])
        if categories["id"] > all_num:
            all_num = categories["id"]

    categories_id_old = []
    categories_id_new = []

    for categories in categories_tmp:
        is_add = False
        for ind, category_name in enumerate(category_name_list):
            if categories["name"] == category_name:
                categories_id_old.append(categories["id"])
                categories_id_new.append(category_name_id_list[ind])
                is_add = True

        if not is_add:
            all_num = all_num + 1
            categories_id_old.append(categories["id"])
            categories_id_new.append(all_num)
            categories["id"] = all_num
            categories["color"] = [random.randint(0, 255), random.randint(0, 255), random.randint(0, 255)]
            categories_all.append(categories)

    print("类别合并完成!")

    return categories_all, categories_id_old, categories_id_new


def images_add(images_all, images_tmp):
    """
    图片合并
    :param images_all:
    :param images_tmp:
    :return:
    """
    all_num = 0
    for image in images_all:
        if image["id"] > all_num:
            all_num = image["id"]

    images_id_old = []
    images_id_new = []

    for image in images_tmp:
        all_num = all_num + 1
        images_id_old.append(image["id"])
        images_id_new.append(all_num)
        image["id"] = all_num

        images_all.append(image)

    print("图片合并完成!")

    return images_all, images_id_old, images_id_new


def annotations_add(annotations_all, annotations_tmp, categories_id_old, categories_id_new, images_id_old,
                    images_id_new):
    """
    标注信息合并
    :param annotations_all:
    :param annotations_tmp:
    :param categories_id_old:
    :param categories_id_new:
    :param images_id_old:
    :param images_id_new:
    :return:
    """
    all_num = 0
    for annotation in annotations_all:
        if annotation["id"] > all_num:
            all_num = annotation["id"]

    for annotation in annotations_tmp:
        all_num = all_num + 1
        annotation["id"] = all_num

        for ind, val in enumerate(categories_id_old):
            if annotation["category_id"] == val:
                annotation["category_id"] = categories_id_new[ind]

        for ind, val in enumerate(images_id_old):
            if annotation["image_id"] == val:
                annotation["image_id"] = images_id_new[ind]

        annotations_all.append(annotation)

    print("标注信息合并完成!")

    return annotations_all


def merge_coco_json(file_path):
    """
    coco数据集的分割数据合并
    :param file_path:
    :return:
    """
    fileNameList = os.listdir(file_path)
    print(fileNameList)

    categories_all = []
    images_all = []
    annotations_all = []
    info_all = "merage coco of weiz"
    licenses = ""
    for i, fileName in enumerate(fileNameList):
        if fileName == g_saveFileName:
            print("忽略json名为{}的文件".format(g_saveFileName))
            continue

        root_data = load_json(os.path.join(file_path, fileName))

        if i == 0:
            categories_all = root_data["categories"]
            images_all = root_data["images"]
            annotations_all = root_data["annotations"]
            # print(len(categories_all))
            # print(len(images_all))
            # print(len(annotations_all))
        else:
            root_data = load_json(os.path.join(file_path, fileName))

            categories_tmp = root_data["categories"]
            images_tmp = root_data["images"]
            annotations_tmp = root_data["annotations"]

            categories_all, categories_id_old, categories_id_new = categories_add(categories_all, categories_tmp)
            # print(categories_id_old)
            # print(categories_id_new)
            # for categories in categories_all:
            #     print(categories)

            images_all, images_id_old, images_id_new = images_add(images_all, images_tmp)
            # print(images_id_old)
            # print(images_id_new)
            # for image in images_all:
            #     print(image)

            annotations_all = annotations_add(annotations_all, annotations_tmp, categories_id_old, categories_id_new,
                                              images_id_old,
                                              images_id_new)
            # for annotation in annotations_all:
            #     print(annotation)

    root_json = {}
    root_json["categories"] = categories_all
    root_json["images"] = images_all
    root_json["annotations"] = annotations_all
    root_json["info"] = info_all
    root_json["licenses"] = licenses

    json_str = json.dumps(root_json)
    saveJsonPath = os.path.join(file_path, g_saveFileName)
    with open(saveJsonPath, 'w') as json_file:
        json_file.write(json_str)

    print("类别共有{}类!{}".format(len(categories_all), categories_all))
    print("图片共有{}张!".format(len(images_all)))
    print("标注共有{}个!".format(len(annotations_all)))


g_saveFileName = "merge.json"
file_path = r"C:\Users\Administrator\Desktop\chaoyi_labeling\merge"  # 待合并的路径
if __name__ == "__main__":
    merge_coco_json(file_path)

三、过滤小分割区域

import os
import json


def filter_min_area(jsonPath):
    """
    每张图片只存在一个分割目标,过滤那些一张图片存在多个分割区域的小目标
    :param jsonPath:
    :return:
    """
    with open(jsonPath, 'r') as load_f:
        json_data = json.load(load_f)
    categories = json_data["categories"]
    images = json_data["images"]
    annotations = json_data["annotations"]
    info = json_data["info"]
    licenses = json_data["licenses"]

    print("categories:", categories)
    print("images number:", len(images))
    print("annotations number:", len(annotations))
    print("info:", info)
    print("licenses:", licenses)

    copy_annotations = []
    count_number = 1
    for i, annotation_i in enumerate(annotations):
        image_id_i = annotation_i["image_id"]
        is_max_area = True
        for annotation_j in annotations:
            image_id_j = annotation_j["image_id"]
            if image_id_i == image_id_j:
                if annotation_j["area"] > annotation_i["area"]:
                    is_max_area = False
                    print("一个图片存在多个标注文件:", annotation_i)

        if is_max_area:
            annotation_i["id"] = count_number
            copy_annotations.append(annotation_i)
            count_number = count_number + 1

    print(len(copy_annotations))
    root_json = {}
    root_json["categories"] = categories
    root_json["images"] = images
    root_json["annotations"] = copy_annotations
    root_json["info"] = info
    root_json["licenses"] = licenses
    json_str = json.dumps(root_json)

    json_new_name = os.path.splitext(jsonPath)[0] + "_new.json"
    with open(json_new_name, 'w') as json_file:
        json_file.write(json_str)

    return json_str


g_setNames = ["train.json", "val.json", "test.json"]
g_ratio = [0.8, 0.2, 0]  # 训练集 验证集 测试集
g_segmentPath = "C:/Users/weiz/Desktop/handrail_segment/annotations.json"
if __name__ == "__main__":
    filter_min_area(g_segmentPath)

四、CVAT标注分割数据转COCO分割数据格式

import json
import cv2
import numpy as np
from pycocotools import mask
from pycocotools.coco import COCO

def is_clockwise(contour):
    value = 0
    num = len(contour)
    for i, point in enumerate(contour):
        p1 = contour[i]
        if i < num - 1:
            p2 = contour[i + 1]
        else:
            p2 = contour[0]
        value += (p2[0][0] - p1[0][0]) * (p2[0][1] + p1[0][1]);
    return value < 0

def get_merge_point_idx(contour1, contour2):
    idx1 = 0
    idx2 = 0
    distance_min = -1
    for i, p1 in enumerate(contour1):
        for j, p2 in enumerate(contour2):
            distance = pow(p2[0][0] - p1[0][0], 2) + pow(p2[0][1] - p1[0][1], 2);
            if distance_min < 0:
                distance_min = distance
                idx1 = i
                idx2 = j
            elif distance < distance_min:
                distance_min = distance
                idx1 = i
                idx2 = j
    return idx1, idx2

def merge_contours(contour1, contour2, idx1, idx2):
    contour = []
    for i in list(range(0, idx1 + 1)):
        contour.append(contour1[i])
    for i in list(range(idx2, len(contour2))):
        contour.append(contour2[i])
    for i in list(range(0, idx2 + 1)):
        contour.append(contour2[i])
    for i in list(range(idx1, len(contour1))):
        contour.append(contour1[i])
    contour = np.array(contour)
    return contour

def merge_with_parent(contour_parent, contour):
    if not is_clockwise(contour_parent):
        contour_parent = contour_parent[::-1]
    if is_clockwise(contour):
        contour = contour[::-1]
    idx1, idx2 = get_merge_point_idx(contour_parent, contour)
    return merge_contours(contour_parent, contour, idx1, idx2)

def mask2polygon(image):
    contours, hierarchies = cv2.findContours(image, cv2.RETR_CCOMP, cv2.CHAIN_APPROX_TC89_KCOS)
    contours_approx = []
    polygons = []
    for contour in contours:
        epsilon = 0.001 * cv2.arcLength(contour, True)
        contour_approx = cv2.approxPolyDP(contour, epsilon, True)
        contours_approx.append(contour_approx)

    contours_parent = []
    for i, contour in enumerate(contours_approx):
        parent_idx = hierarchies[0][i][3]
        if parent_idx < 0 and len(contour) >= 3:
            contours_parent.append(contour)
        else:
            contours_parent.append([])

    for i, contour in enumerate(contours_approx):
        parent_idx = hierarchies[0][i][3]
        if parent_idx >= 0 and len(contour) >= 3:
            contour_parent = contours_parent[parent_idx]
            if len(contour_parent) == 0:
                continue
            contours_parent[parent_idx] = merge_with_parent(contour_parent, contour)

    contours_parent_tmp = []
    for contour in contours_parent:
        if len(contour) == 0:
            continue
        contours_parent_tmp.append(contour)

    polygons = []
    for contour in contours_parent_tmp:
        polygon = contour.flatten().tolist()
        polygons.append(polygon)
    return polygons


if __name__ == '__main__':
    # 指定cvat导出的标注文件的路径
    annotations_file = 'D:/user/ljpDesktop/instances_Train.json'
    # 初始化COCO对象
    coco = COCO(annotations_file)
    # 获取所有的图像ID
    image_ids = coco.getImgIds()
    # 遍历每一个图像ID
    for img_id in image_ids:
        # 获取该图像的所有标注信息
        annotations_ids = coco.getAnnIds(imgIds=img_id)
        annotation = coco.loadAnns(annotations_ids)
        segmentation = annotation[0]['segmentation']
        if isinstance(segmentation["counts"], list):
            segmentation = mask.frPyObjects(segmentation, *segmentation["size"])
        m = mask.decode(segmentation)
        m[m > 0] = 255
        polygons = mask2polygon(m)
        annotation[0]['segmentation'] = polygons
        #print('annotation[0][segmentation]:', annotation[0]['segmentation'])
        annotation[0]['iscrowd'] = 0

    #修改后保存的路径
    new_file_path = 'modified_annotations.json'
    with open(new_file_path, 'w') as f:
        json.dump(coco.dataset, f, indent=4)

    print(f"修改后的数据已保存到 {new_file_path}")

五、coco分割数据集划分

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time        :2024/4/11 13:37
# @Author      :weiz
# @ProjectName :jiangling_new
# @File        :coco_merge.py
# @Description :
import os
import json
import random


def load_json(filenamejson):
    """
    加载json数据
    :param filenamejson:
    :return:
    """
    with open(filenamejson) as f:
        raw_data = json.load(f)
    return raw_data


def dataset_partitioning(jsonPath, trainSetRatio=0.8, valSetRatio=0.1, testSetRatio=0.1):
    """
    coco分割数据集划分
    :param jsonPath:
    :param trainSetRatio:
    :param valSetRatio:
    :param testSetRatio:
    :return:
    """
    if (trainSetRatio + valSetRatio + testSetRatio) != 1.0:
        print("trainSet + valSet + testSet != 1")
        return

    savePath = os.path.dirname(jsonPath)

    root_data = load_json(jsonPath)

    categories_all = root_data["categories"]
    images_all = root_data["images"]
    annotations_all = root_data["annotations"]
    info_all = root_data["info"]
    licenses_all = root_data["licenses"]

    images_id_all = []
    for image in images_all:
        images_id_all.append(image["id"])

    random.shuffle(images_id_all)

    test_index = int(len(images_id_all) * testSetRatio)
    val_index = int(len(images_id_all) * valSetRatio)

    testImagesIDSet = images_id_all[:test_index]
    valImagesIDSet = images_id_all[test_index:test_index+val_index]
    trainImagesIDSet = list(set(images_id_all) - set(testImagesIDSet) - set(valImagesIDSet))

    images_test = []
    images_val = []
    images_train = []
    for image in images_all:
        if image["id"] in trainImagesIDSet:
            images_train.append(image)
        elif image["id"] in valImagesIDSet:
            images_val.append(image)
        else:
            images_test.append(image["id"])

    annotations_test = []
    annotations_val = []
    annotations_train = []
    for annotation in annotations_all:
        if annotation["image_id"] in trainImagesIDSet:
            annotations_train.append(annotation)
        elif annotation["image_id"] in valImagesIDSet:
            annotations_val.append(annotation)
        else:
            annotations_test.append(annotation)
    print("总数据图片{}张,总标注文件{}个!".format(len(images_all), len(annotations_all)))
    print("训练集图片{}张,标注文件{}个!".format(len(images_train), len(annotations_train)))
    print("验证集集图片{}张,标注文件{}个!".format(len(images_val), len(annotations_val)))
    print("测试集图片{}张,标注文件{}个!".format(len(images_test), len(annotations_test)))

    train_root = {}
    train_root["categories"] = categories_all
    train_root["images"] = images_train
    train_root["annotations"] = annotations_train
    train_root["info"] = info_all
    train_root["licenses"] = licenses_all
    json_str = json.dumps(train_root)
    saveJsonPath = os.path.join(savePath, "train.json")
    with open(saveJsonPath, 'w') as json_file:
        json_file.write(json_str)

    val_root = {}
    val_root["categories"] = categories_all
    val_root["images"] = images_val
    val_root["annotations"] = annotations_val
    val_root["info"] = info_all
    val_root["licenses"] = licenses_all
    json_str = json.dumps(val_root)
    saveJsonPath = os.path.join(savePath, "val.json")
    with open(saveJsonPath, 'w') as json_file:
        json_file.write(json_str)

    test_root = {}
    test_root["categories"] = categories_all
    test_root["images"] = images_test
    test_root["annotations"] = annotations_test
    test_root["info"] = info_all
    test_root["licenses"] = licenses_all
    json_str = json.dumps(test_root)
    saveJsonPath = os.path.join(savePath, "test.json")
    with open(saveJsonPath, 'w') as json_file:
        json_file.write(json_str)


g_annotationsPath = "C:/Users/weiz/Desktop/handrail_all/merge.json"
if __name__ == "__main__":
    dataset_partitioning(g_annotationsPath, 0.9, 0.1, 0.0)

评论 5
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值