import os import csv import json import shutil import math from typing import List, Union, Dict # === 文件夹配置 === csv_folder = r"\\192.168.50.222\share\zyh\master_dataset\pokou\remark_251104\params" # CSV 文件夹 json_folder = r"\\192.168.50.222\share\zyh\master_dataset\pokou\total" # JSON 和图片文件夹 output_folder = r"\\192.168.50.222\share\zyh\master_dataset\pokou\251115\csvjson" # 输出文件夹 os.makedirs(output_folder, exist_ok=True) def compute_arc_ends(points: List[List[float]]) -> List[List[float]]: if len(points) != 3: return [[0,0],[0,0]] p1, p2, p3 = points x1, y1 = p1 x2, y2 = p2 x3, y3 = p3 # --- 圆心 --- A = 2*(x2-x1) B = 2*(y2-y1) C = x2**2+y2**2-x1**2-y1**2 D = 2*(x3-x2) E = 2*(y3-y2) F = x3**2+y3**2-x2**2-y2**2 denom = A*E - B*D if denom == 0: return [p1, p3] cx = (C*E - F*B)/denom cy = (A*F - D*C)/denom angles = [math.atan2(y-cy, x-cx) for x,y in points] def angle_diff(a1,a2): diff = (a2-a1)%(2*math.pi) if diff>math.pi: diff=2*math.pi-diff return diff pairs = [(0,1),(0,2),(1,2)] max_diff=-1 end_pair=(0,1) for i,j in pairs: diff=angle_diff(angles[i],angles[j]) if diff>max_diff: max_diff=diff end_pair=(i,j) return [points[end_pair[0]], points[end_pair[1]]] # === 工具函数:匹配点到最近椭圆 === def match_point_to_ellipse(point: List[float], ellipses: List[Dict]) -> int: """ 根据点匹配到最近的椭圆 point: [x,y] ellipses: list of dict,每个包含 cx,cy return: ellipse_index """ x, y = point min_dist = float('inf') match_idx = -1 for i, e in enumerate(ellipses): cx, cy = e['cx'], e['cy'] dist = math.hypot(x - cx, y - cy) if dist < min_dist: min_dist = dist match_idx = i return match_idx # === 读取 CSV,构建文件名到椭圆参数映射 === csv_ellipse_map = {} # filename -> list of ellipse params for csv_file in os.listdir(csv_folder): if not csv_file.endswith(".csv"): continue csv_path = os.path.join(csv_folder, csv_file) with open(csv_path, "r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) for row in reader: filename = row["filename"].strip() shape_str = row["region_shape_attributes"] try: shape_data = json.loads(shape_str) except json.JSONDecodeError: shape_data = json.loads(shape_str.replace('""', '"')) if filename not in csv_ellipse_map: csv_ellipse_map[filename] = [] csv_ellipse_map[filename].append(shape_data) # === 遍历 JSON 文件 === for json_file in os.listdir(json_folder): if not json_file.endswith(".json"): continue json_path = os.path.join(json_folder, json_file) filename = json_file.replace(".json", ".jpg") img_path = os.path.join(json_folder, filename) if not os.path.exists(img_path): print(f"?? Image not found for {filename}") continue if filename not in csv_ellipse_map: print(f"?? CSV ellipse not found for {filename}") continue with open(json_path, "r", encoding="utf-8") as jf: data = json.load(jf) if "shapes" not in data: data["shapes"] = [] # 收集所有 arc 单点 arc_points = [s["points"][0] for s in data["shapes"] if s.get("label")=="arc" and "points" in s and len(s["points"])==1] # 根据 CSV 椭圆匹配分组 ellipses = csv_ellipse_map[filename] ellipse_point_map = {i: [] for i in range(len(ellipses))} for pt in arc_points: idx = match_point_to_ellipse(pt, ellipses) ellipse_point_map[idx].append(pt) # 打印匹配到两个及以上椭圆的图片信息 active_ellipses = [i for i, pts in ellipse_point_map.items() if len(pts) >= 1] if len(active_ellipses) >= 2: print(f"Image {filename} matches {len(active_ellipses)} ellipses") # 构造新的 arc_shape new_arc_shapes = [] for idx, pts in ellipse_point_map.items(): if len(pts) != 3: print(f"?? {filename} ellipse {idx} points not equal 3, got {len(pts)}") ends = [[0,0],[0,0]] else: ends = compute_arc_ends(pts) e = ellipses[idx] arc_shape = { "label": "arc", "points": pts, "params": [e.get("cx",0), e.get("cy",0), e.get("rx",0), e.get("ry",0), e.get("theta",0)], "ends": ends, "group_id": None, "description": "", "difficult": False, "shape_type": "arc", "flags": {}, "attributes": {} } new_arc_shapes.append(arc_shape) # 保留非 arc shapes remaining_shapes = [s for s in data["shapes"] if s.get("label") != "arc"] data["shapes"] = remaining_shapes + new_arc_shapes # 保存 JSON output_json = os.path.join(output_folder, json_file) with open(output_json, "w", encoding="utf-8") as jf: json.dump(data, jf, ensure_ascii=False, indent=2) # 复制图片 shutil.copy2(img_path, os.path.join(output_folder, filename)) print(f"Saved merged JSON and image for: {filename}") print("\nAll done! Final JSONs and images saved in:", output_folder)