| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197 |
- import os
- import csv
- import json
- import shutil
- import math
- from typing import List, Union, Dict
- # === 文件夹配置 ===
- csv_folder = r"/data/share/zyh/master_dataset/pokou/merge/251121_251115/csv" # CSV 文件夹
- json_folder_json = r"/data/share/zyh/master_dataset/pokou/merge/251121_251115/json" # JSON 文件夹
- json_folder_img = r"/data/share/zyh/master_dataset/pokou/merge/251121_251115/image" # 图片文件夹
- output_folder = r"/data/share/zyh/master_dataset/dataset_net/pokou_251115_251121/to_dataset" # 输出文件夹
- os.makedirs(output_folder, exist_ok=True)
- # ==============================================================
- # 计算圆弧端点
- # ==============================================================
- def compute_arc_ends(points: List[List[float]]) -> List[List[float]]:
- if len(points) != 3:
- return [[0, 0], [0, 0]]
- p1, p2, p3 = points
- x1, y1 = p1
- x2, y2 = p2
- x3, y3 = p3
- A = 2 * (x2 - x1)
- B = 2 * (y2 - y1)
- C = x2**2 + y2**2 - x1**2 - y1**2
- D = 2 * (x3 - x2)
- E = 2 * (y3 - y2)
- F = x3**2 + y3**2 - x2**2 - y2**2
- denom = A * E - B * D
- if denom == 0:
- return [p1, p3]
- cx = (C * E - F * B) / denom
- cy = (A * F - D * C) / denom
- angles = [math.atan2(y - cy, x - cx) for x, y in points]
- def angle_diff(a1, a2):
- diff = (a2 - a1) % (2 * math.pi)
- if diff > math.pi:
- diff = 2 * math.pi - diff
- return diff
- pairs = [(0, 1), (0, 2), (1, 2)]
- max_diff = -1
- end_pair = (0, 1)
- for i, j in pairs:
- diff = angle_diff(angles[i], angles[j])
- if diff > max_diff:
- max_diff = diff
- end_pair = (i, j)
- return [points[end_pair[0]], points[end_pair[1]]]
- # ==============================================================
- # 根据点匹配到最近椭圆
- # ==============================================================
- def match_point_to_ellipse(point: List[float], ellipses: List[Dict]) -> int:
- x, y = point
- min_dist = float("inf")
- match_idx = -1
- for i, e in enumerate(ellipses):
- cx, cy = e["cx"], e["cy"]
- dist = math.hypot(x - cx, y - cy)
- if dist < min_dist:
- min_dist = dist
- match_idx = i
- return match_idx
- # ==============================================================
- # 从 CSV 读取椭圆参数映射
- # ==============================================================
- csv_ellipse_map = {} # filename -> list of ellipse params
- for csv_file in os.listdir(csv_folder):
- if not csv_file.endswith(".csv"):
- continue
- csv_path = os.path.join(csv_folder, csv_file)
- with open(csv_path, "r", encoding="utf-8-sig") as f:
- reader = csv.DictReader(f)
- for row in reader:
- filename = row["filename"].strip()
- shape_str = row["region_shape_attributes"]
- try:
- shape_data = json.loads(shape_str)
- except json.JSONDecodeError:
- shape_data = json.loads(shape_str.replace('""', '"'))
- if filename not in csv_ellipse_map:
- csv_ellipse_map[filename] = []
- csv_ellipse_map[filename].append(shape_data)
- # ==============================================================
- # 遍历 JSON 文件
- # ==============================================================
- for json_file in os.listdir(json_folder_json):
- if not json_file.endswith(".json"):
- continue
- json_path = os.path.join(json_folder_json, json_file)
- filename = json_file.replace(".json", ".jpg") # 图片的名字
- img_path = os.path.join(json_folder_img, filename) # 图片从独立文件夹读取
- # 图片存在性检查
- if not os.path.exists(img_path):
- print(f"[WARN] Image not found for: {filename}")
- continue
- # CSV 中必须有匹配的记录
- if filename not in csv_ellipse_map:
- print(f"[WARN] No CSV ellipse for: {filename}")
- continue
- # 读取 JSON
- with open(json_path, "r", encoding="utf-8") as jf:
- data = json.load(jf)
- if "shapes" not in data:
- data["shapes"] = []
- # 获取 JSON 中的单点 arc 标注
- arc_points = [
- s["points"][0]
- for s in data["shapes"]
- if s.get("label") == "arc" and "points" in s and len(s["points"]) == 1
- ]
- # 从 CSV 获取椭圆信息
- ellipses = csv_ellipse_map[filename]
- ellipse_point_map = {i: [] for i in range(len(ellipses))}
- # 将 arc 点匹配到最近的椭圆
- for pt in arc_points:
- idx = match_point_to_ellipse(pt, ellipses)
- ellipse_point_map[idx].append(pt)
- # 生成新的 arc shapes
- new_arc_shapes = []
- for idx, pts in ellipse_point_map.items():
- if len(pts) != 3:
- print(f"[WARN] {filename} ellipse {idx} has {len(pts)} points (expected 3)")
- ends = [[0, 0], [0, 0]]
- else:
- ends = compute_arc_ends(pts)
- e = ellipses[idx]
- arc_shape = {
- "label": "arc",
- "points": pts,
- "params": [
- e.get("cx", 0),
- e.get("cy", 0),
- e.get("rx", 0),
- e.get("ry", 0),
- e.get("theta", 0),
- ],
- "ends": ends,
- "group_id": None,
- "description": "",
- "difficult": False,
- "shape_type": "arc",
- "flags": {},
- "attributes": {},
- }
- new_arc_shapes.append(arc_shape)
- # 删除旧 arc,添加新 arc
- remaining = [s for s in data["shapes"] if s.get("label") != "arc"]
- data["shapes"] = remaining + new_arc_shapes
- # 输出 JSON
- output_json = os.path.join(output_folder, json_file)
- with open(output_json, "w", encoding="utf-8") as jf:
- json.dump(data, jf, ensure_ascii=False, indent=2)
- # 复制图片
- shutil.copy2(img_path, os.path.join(output_folder, filename))
- print(f"[OK] Saved merged data for: {filename}")
- print("\nAll done! Output in:", output_folder)
|