| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- import os
- import csv
- import json
- import shutil
- import math
- from typing import List, Union, Dict
- # === 文件夹配置 ===
- csv_folder = r"\\192.168.50.222\share\zyh\master_dataset\pokou\remark_251104\params" # CSV 文件夹
- json_folder = r"\\192.168.50.222\share\zyh\master_dataset\pokou\total" # JSON 和图片文件夹
- output_folder = r"\\192.168.50.222\share\zyh\master_dataset\pokou\251115\csvjson" # 输出文件夹
- os.makedirs(output_folder, exist_ok=True)
- def compute_arc_ends(points: List[List[float]]) -> List[List[float]]:
- if len(points) != 3:
- return [[0,0],[0,0]]
- p1, p2, p3 = points
- x1, y1 = p1
- x2, y2 = p2
- x3, y3 = p3
- # --- 圆心 ---
- A = 2*(x2-x1)
- B = 2*(y2-y1)
- C = x2**2+y2**2-x1**2-y1**2
- D = 2*(x3-x2)
- E = 2*(y3-y2)
- F = x3**2+y3**2-x2**2-y2**2
- denom = A*E - B*D
- if denom == 0:
- return [p1, p3]
- cx = (C*E - F*B)/denom
- cy = (A*F - D*C)/denom
- angles = [math.atan2(y-cy, x-cx) for x,y in points]
- def angle_diff(a1,a2):
- diff = (a2-a1)%(2*math.pi)
- if diff>math.pi: diff=2*math.pi-diff
- return diff
- pairs = [(0,1),(0,2),(1,2)]
- max_diff=-1
- end_pair=(0,1)
- for i,j in pairs:
- diff=angle_diff(angles[i],angles[j])
- if diff>max_diff:
- max_diff=diff
- end_pair=(i,j)
- return [points[end_pair[0]], points[end_pair[1]]]
- # === 工具函数:匹配点到最近椭圆 ===
- def match_point_to_ellipse(point: List[float], ellipses: List[Dict]) -> int:
- """
- 根据点匹配到最近的椭圆
- point: [x,y]
- ellipses: list of dict,每个包含 cx,cy
- return: ellipse_index
- """
- x, y = point
- min_dist = float('inf')
- match_idx = -1
- for i, e in enumerate(ellipses):
- cx, cy = e['cx'], e['cy']
- dist = math.hypot(x - cx, y - cy)
- if dist < min_dist:
- min_dist = dist
- match_idx = i
- return match_idx
- # === 读取 CSV,构建文件名到椭圆参数映射 ===
- csv_ellipse_map = {} # filename -> list of ellipse params
- for csv_file in os.listdir(csv_folder):
- if not csv_file.endswith(".csv"):
- continue
- csv_path = os.path.join(csv_folder, csv_file)
- with open(csv_path, "r", encoding="utf-8-sig") as f:
- reader = csv.DictReader(f)
- for row in reader:
- filename = row["filename"].strip()
- shape_str = row["region_shape_attributes"]
- try:
- shape_data = json.loads(shape_str)
- except json.JSONDecodeError:
- shape_data = json.loads(shape_str.replace('""', '"'))
- if filename not in csv_ellipse_map:
- csv_ellipse_map[filename] = []
- csv_ellipse_map[filename].append(shape_data)
- # === 遍历 JSON 文件 ===
- for json_file in os.listdir(json_folder):
- if not json_file.endswith(".json"):
- continue
- json_path = os.path.join(json_folder, json_file)
- filename = json_file.replace(".json", ".jpg")
- img_path = os.path.join(json_folder, filename)
- if not os.path.exists(img_path):
- print(f"?? Image not found for {filename}")
- continue
- if filename not in csv_ellipse_map:
- print(f"?? CSV ellipse not found for {filename}")
- continue
- with open(json_path, "r", encoding="utf-8") as jf:
- data = json.load(jf)
- if "shapes" not in data:
- data["shapes"] = []
- # 收集所有 arc 单点
- arc_points = [s["points"][0] for s in data["shapes"]
- if s.get("label")=="arc" and "points" in s and len(s["points"])==1]
- # 根据 CSV 椭圆匹配分组
- ellipses = csv_ellipse_map[filename]
- ellipse_point_map = {i: [] for i in range(len(ellipses))}
- for pt in arc_points:
- idx = match_point_to_ellipse(pt, ellipses)
- ellipse_point_map[idx].append(pt)
- # 打印匹配到两个及以上椭圆的图片信息
- active_ellipses = [i for i, pts in ellipse_point_map.items() if len(pts) >= 1]
- if len(active_ellipses) >= 2:
- print(f"Image {filename} matches {len(active_ellipses)} ellipses")
- # 构造新的 arc_shape
- new_arc_shapes = []
- for idx, pts in ellipse_point_map.items():
- if len(pts) != 3:
- print(f"?? {filename} ellipse {idx} points not equal 3, got {len(pts)}")
- ends = [[0,0],[0,0]]
- else:
- ends = compute_arc_ends(pts)
- e = ellipses[idx]
- arc_shape = {
- "label": "arc",
- "points": pts,
- "params": [e.get("cx",0), e.get("cy",0), e.get("rx",0), e.get("ry",0), e.get("theta",0)],
- "ends": ends,
- "group_id": None,
- "description": "",
- "difficult": False,
- "shape_type": "arc",
- "flags": {},
- "attributes": {}
- }
- new_arc_shapes.append(arc_shape)
- # 保留非 arc shapes
- remaining_shapes = [s for s in data["shapes"] if s.get("label") != "arc"]
- data["shapes"] = remaining_shapes + new_arc_shapes
- # 保存 JSON
- output_json = os.path.join(output_folder, json_file)
- with open(output_json, "w", encoding="utf-8") as jf:
- json.dump(data, jf, ensure_ascii=False, indent=2)
- # 复制图片
- shutil.copy2(img_path, os.path.join(output_folder, filename))
- print(f"Saved merged JSON and image for: {filename}")
- print("\nAll done! Final JSONs and images saved in:", output_folder)
|