import os import json from itertools import combinations import numpy as np import skimage from scipy.ndimage import zoom from tqdm import tqdm import imageio.v3 as iio def to_int(x): return tuple(map(int, x)) def convert_labelme_to_custom_json(labelme_json_path): try: with open(labelme_json_path, 'r', encoding='utf-8') as f: labelme_data = json.load(f) if 'imageHeight' not in labelme_data: raise KeyError("Missing 'imageHeight' in JSON") if 'imageWidth' not in labelme_data: raise KeyError("Missing 'imageWidth' in JSON") image_height = labelme_data['imageHeight'] image_width = labelme_data['imageWidth'] lines = [] for shape in labelme_data.get('shapes', []): if shape.get('label') == 'dseam1': if len(shape.get('points', [])) != 2: print(f"Warning: Skipping invalid line in {labelme_json_path}") continue start_point = shape['points'][0] end_point = shape['points'][1] lines.append([start_point, end_point]) if not lines: raise ValueError("No line segments found in the JSON") custom_json_data = { "image_id": os.path.splitext(os.path.basename(labelme_json_path))[0], "lines": lines } return custom_json_data except Exception as e: print(f"Error reading {labelme_json_path}: {e}") return None def find_image_file(json_path, image_extensions=('.jpg', '.jpeg', '.png', '.bmp', '.tiff')): base_name = os.path.splitext(json_path)[0] for ext in image_extensions: image_path = f"{base_name}{ext}" if os.path.exists(image_path): return image_path return None def generate_heatmap_data(image, lines): try: im_rescale = (512, 512) heatmap_scale = (128, 128) fy, fx = heatmap_scale[1] / image.shape[0], heatmap_scale[0] / image.shape[1] jmap = np.zeros((1,) + heatmap_scale, dtype=np.float32) joff = np.zeros((1, 2) + heatmap_scale, dtype=np.float32) lmap = np.zeros(heatmap_scale, dtype=np.float32) lines[:, :, 0] = np.clip(lines[:, :, 0] * fx, 0, heatmap_scale[0] - 1e-4) lines[:, :, 1] = np.clip(lines[:, :, 1] * fy, 0, heatmap_scale[1] - 1e-4) lines = lines[:, :, ::-1] junc = [] jids = {} def jid(jun): jun = tuple(jun[:2]) if jun in jids: return jids[jun] jids[jun] = len(junc) junc.append(np.array(jun + (0,))) return len(junc) - 1 lnid = [] lpos, lneg = [], [] for v0, v1 in lines: lnid.append((jid(v0), jid(v1))) lpos.append([junc[jid(v0)], junc[jid(v1)]]) vint0, vint1 = to_int(v0), to_int(v1) jmap[0][vint0] = 1 jmap[0][vint1] = 1 rr, cc, value = skimage.draw.line_aa(*to_int(v0), *to_int(v1)) lmap[rr, cc] = np.maximum(lmap[rr, cc], value) for v in junc: vint = to_int(v[:2]) joff[0, :, vint[0], vint[1]] = v[:2] - vint - 0.5 llmap = zoom(lmap, [0.5, 0.5]) lineset = set([frozenset(l) for l in lnid]) for i0, i1 in combinations(range(len(junc)), 2): if frozenset([i0, i1]) not in lineset: v0, v1 = junc[i0], junc[i1] vint0, vint1 = to_int(v0[:2] / 2), to_int(v1[:2] / 2) rr, cc, value = skimage.draw.line_aa(*vint0, *vint1) lneg.append([v0, v1, i0, i1, np.average(np.minimum(value, llmap[rr, cc]))]) if len(lneg) == 0: for i0, i1 in combinations(range(len(junc)), 2): v0, v1 = junc[i0], junc[i1] vint0, vint1 = to_int(v0[:2] / 2), to_int(v1[:2] / 2) rr, cc, value = skimage.draw.line_aa(*vint0, *vint1) simulated_value = np.random.uniform(0.01, 0.1) lneg.append([v0, v1, i0, i1, simulated_value]) lneg.sort(key=lambda l: -l[-1]) junc = np.array(junc, dtype=np.float32) Lpos = np.array(lnid, dtype=np.int32) Lneg = np.array([l[2:4] for l in lneg][:4000], dtype=np.int32) lpos = np.array(lpos, dtype=np.float32) lneg = np.array([l[:2] for l in lneg[:2000]], dtype=np.float32) return { "junc_map": {"name": "junc_map", "shape": list(jmap.shape), "content": jmap.tolist()}, "junc_offset": {"name": "junc_offset", "shape": list(joff.shape), "content": joff.tolist()}, "line_map": {"name": "line_map", "shape": list(lmap.shape), "content": lmap.tolist()}, "junc_coords": {"name": "junc_coords", "shape": list(junc.shape), "content": junc.tolist()}, "line_pos_idx": {"name": "line_pos_idx", "shape": list(Lpos.shape), "content": Lpos.tolist()}, "line_neg_idx": {"name": "line_neg_idx", "shape": list(Lneg.shape), "content": Lneg.tolist()}, "line_pos_coords": {"name": "line_pos_coords", "shape": list(lpos.shape), "content": lpos.tolist()}, "line_neg_coords": {"name": "line_neg_coords", "shape": list(lneg.shape), "content": lneg.tolist()} } except Exception as e: print(f"Error generating heatmap data: {e}") raise def batch_process(labelme_json_dir, output_dir): json_files = [f for f in os.listdir(labelme_json_dir) if f.endswith('.json')] os.makedirs(output_dir, exist_ok=True) for filename in tqdm(json_files, desc="Processing JSON files"): labelme_json_path = os.path.join(labelme_json_dir, filename) try: custom_json_data = convert_labelme_to_custom_json(labelme_json_path) if custom_json_data is None: continue image_path = find_image_file(labelme_json_path) if not image_path or not os.path.exists(image_path): print(f"Image not found: {labelme_json_path}") continue image = iio.imread(image_path) if image.ndim == 2: image = image[:, :, np.newaxis] elif image.ndim == 3 and image.shape[2] == 4: image = image[:, :, :3] # Drop alpha if needed # 保存缩放后 TIFF(保持原始位深度) zoom_factors = [512 / image.shape[0], 512 / image.shape[1], 1] resized_image = zoom(image, zoom_factors, order=0) resized_image = resized_image.astype(image.dtype) image_save_path = os.path.join(output_dir, os.path.splitext(filename)[0] + '.tiff') iio.imwrite(image_save_path, resized_image) # heatmap 处理 lines = np.array(custom_json_data['lines']).reshape(-1, 2, 2) wires_data = generate_heatmap_data(image, lines) # 输出 JSON 文件 custom_json_data.pop("lines", None) custom_json_data.update({ "boxes": [], "segmentations": [], "wires": [wires_data] }) output_json_path = os.path.join(output_dir, os.path.splitext(filename)[0] + '.json') with open(output_json_path, 'w', encoding='utf-8') as f: json.dump(custom_json_data, f, separators=(',', ':')) except Exception as e: print(f"Error processing {labelme_json_path}: {e}") if __name__ == "__main__": labelme_json_dir = r"\\192.168.50.222\share\zyh\512\init\paomo" custom_json_output_dir = r"\\192.168.50.222\share\zyh\512\init\target" batch_process(labelme_json_dir, custom_json_output_dir)