import os import csv import json import shutil import math from typing import List, Union, Dict # === 文件夹配置 === csv_folder = r"/data/share/zyh/master_dataset/pokou/merge/251121_251115/csv" # CSV 文件夹 json_folder_json = r"/data/share/zyh/master_dataset/pokou/merge/251121_251115/json" # JSON 文件夹 json_folder_img = r"/data/share/zyh/master_dataset/pokou/merge/251121_251115/image" # 图片文件夹 output_folder = r"/data/share/zyh/master_dataset/dataset_net/pokou_251115_251121/to_dataset" # 输出文件夹 os.makedirs(output_folder, exist_ok=True) # ============================================================== # 计算圆弧端点 # ============================================================== def compute_arc_ends(points: List[List[float]]) -> List[List[float]]: if len(points) != 3: return [[0, 0], [0, 0]] p1, p2, p3 = points x1, y1 = p1 x2, y2 = p2 x3, y3 = p3 A = 2 * (x2 - x1) B = 2 * (y2 - y1) C = x2**2 + y2**2 - x1**2 - y1**2 D = 2 * (x3 - x2) E = 2 * (y3 - y2) F = x3**2 + y3**2 - x2**2 - y2**2 denom = A * E - B * D if denom == 0: return [p1, p3] cx = (C * E - F * B) / denom cy = (A * F - D * C) / denom angles = [math.atan2(y - cy, x - cx) for x, y in points] def angle_diff(a1, a2): diff = (a2 - a1) % (2 * math.pi) if diff > math.pi: diff = 2 * math.pi - diff return diff pairs = [(0, 1), (0, 2), (1, 2)] max_diff = -1 end_pair = (0, 1) for i, j in pairs: diff = angle_diff(angles[i], angles[j]) if diff > max_diff: max_diff = diff end_pair = (i, j) return [points[end_pair[0]], points[end_pair[1]]] # ============================================================== # 根据点匹配到最近椭圆 # ============================================================== def match_point_to_ellipse(point: List[float], ellipses: List[Dict]) -> int: x, y = point min_dist = float("inf") match_idx = -1 for i, e in enumerate(ellipses): cx, cy = e["cx"], e["cy"] dist = math.hypot(x - cx, y - cy) if dist < min_dist: min_dist = dist match_idx = i return match_idx # ============================================================== # 从 CSV 读取椭圆参数映射 # ============================================================== csv_ellipse_map = {} # filename -> list of ellipse params for csv_file in os.listdir(csv_folder): if not csv_file.endswith(".csv"): continue csv_path = os.path.join(csv_folder, csv_file) with open(csv_path, "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) for row in reader: filename = row["filename"].strip() shape_str = row["region_shape_attributes"] try: shape_data = json.loads(shape_str) except json.JSONDecodeError: shape_data = json.loads(shape_str.replace('""', '"')) if filename not in csv_ellipse_map: csv_ellipse_map[filename] = [] csv_ellipse_map[filename].append(shape_data) # ============================================================== # 遍历 JSON 文件 # ============================================================== for json_file in os.listdir(json_folder_json): if not json_file.endswith(".json"): continue json_path = os.path.join(json_folder_json, json_file) filename = json_file.replace(".json", ".jpg") # 图片的名字 img_path = os.path.join(json_folder_img, filename) # 图片从独立文件夹读取 # 图片存在性检查 if not os.path.exists(img_path): print(f"[WARN] Image not found for: {filename}") continue # CSV 中必须有匹配的记录 if filename not in csv_ellipse_map: print(f"[WARN] No CSV ellipse for: {filename}") continue # 读取 JSON with open(json_path, "r", encoding="utf-8") as jf: data = json.load(jf) if "shapes" not in data: data["shapes"] = [] # 获取 JSON 中的单点 arc 标注 arc_points = [ s["points"][0] for s in data["shapes"] if s.get("label") == "arc" and "points" in s and len(s["points"]) == 1 ] # 从 CSV 获取椭圆信息 ellipses = csv_ellipse_map[filename] ellipse_point_map = {i: [] for i in range(len(ellipses))} # 将 arc 点匹配到最近的椭圆 for pt in arc_points: idx = match_point_to_ellipse(pt, ellipses) ellipse_point_map[idx].append(pt) # 生成新的 arc shapes new_arc_shapes = [] for idx, pts in ellipse_point_map.items(): if len(pts) != 3: print(f"[WARN] {filename} ellipse {idx} has {len(pts)} points (expected 3)") ends = [[0, 0], [0, 0]] else: ends = compute_arc_ends(pts) e = ellipses[idx] arc_shape = { "label": "arc", "points": pts, "params": [ e.get("cx", 0), e.get("cy", 0), e.get("rx", 0), e.get("ry", 0), e.get("theta", 0), ], "ends": ends, "group_id": None, "description": "", "difficult": False, "shape_type": "arc", "flags": {}, "attributes": {}, } new_arc_shapes.append(arc_shape) # 删除旧 arc,添加新 arc remaining = [s for s in data["shapes"] if s.get("label") != "arc"] data["shapes"] = remaining + new_arc_shapes # 输出 JSON output_json = os.path.join(output_folder, json_file) with open(output_json, "w", encoding="utf-8") as jf: json.dump(data, jf, ensure_ascii=False, indent=2) # 复制图片 shutil.copy2(img_path, os.path.join(output_folder, filename)) print(f"[OK] Saved merged data for: {filename}") print("\nAll done! Output in:", output_folder)