|
|
@@ -0,0 +1,321 @@
|
|
|
+import os
|
|
|
+import json
|
|
|
+import cv2
|
|
|
+import numpy as np
|
|
|
+import random
|
|
|
+import copy
|
|
|
+import shutil
|
|
|
+from typing import List, Tuple, Dict, Any
|
|
|
+
|
|
|
+# 数据增强配置
|
|
|
+config = {
|
|
|
+ 'rotate': True,
|
|
|
+ 'crop': True,
|
|
|
+ 'occlude': False,
|
|
|
+ 'flip': True,
|
|
|
+ 'brightness': True,
|
|
|
+ 'blur': True,
|
|
|
+ 'noise': True,
|
|
|
+ 'scale': True,
|
|
|
+ 'color_jitter': True,
|
|
|
+ 'adjust_brightness_contrast': True,
|
|
|
+ 'shadow': True,
|
|
|
+ 'highlight': True
|
|
|
+}
|
|
|
+
|
|
|
+# --------------------------- 增强模块 ---------------------------
|
|
|
+
|
|
|
+class Compose:
|
|
|
+ def __init__(self, transforms: list):
|
|
|
+ self.transforms = transforms
|
|
|
+
|
|
|
+ def __call__(self, image: np.ndarray, label_data: Dict) -> Tuple[np.ndarray, Dict]:
|
|
|
+ for t in self.transforms:
|
|
|
+ image, label_data = t(image, label_data)
|
|
|
+ return image, label_data
|
|
|
+
|
|
|
+
|
|
|
+class RandomFlip:
|
|
|
+ def __init__(self, probability: float = 0.5):
|
|
|
+ self.probability = probability
|
|
|
+
|
|
|
+ def __call__(self, image: np.ndarray, label_data: Dict) -> Tuple[np.ndarray, Dict]:
|
|
|
+ if random.random() > self.probability:
|
|
|
+ return image, label_data
|
|
|
+
|
|
|
+ flip_code = random.choice([-1, 0, 1])
|
|
|
+ flipped_image = cv2.flip(image, flip_code)
|
|
|
+ h, w = image.shape[:2]
|
|
|
+
|
|
|
+ new_shapes = []
|
|
|
+ for shape in label_data['shapes']:
|
|
|
+ new_shape = copy.deepcopy(shape)
|
|
|
+ new_points = []
|
|
|
+ for x, y in shape['points']:
|
|
|
+ new_x = w - x if flip_code in [-1, 1] else x
|
|
|
+ new_y = h - y if flip_code in [-1, 0] else y
|
|
|
+ new_points.append([new_x, new_y])
|
|
|
+ new_shape['points'] = new_points
|
|
|
+ new_shapes.append(new_shape)
|
|
|
+
|
|
|
+ label_data['shapes'] = new_shapes
|
|
|
+ return flipped_image, label_data
|
|
|
+
|
|
|
+
|
|
|
+class RandomBrightness:
|
|
|
+ def __init__(self, probability: float = 0.5, delta: int = 30):
|
|
|
+ self.probability = probability
|
|
|
+ self.delta = delta
|
|
|
+
|
|
|
+ def __call__(self, image: np.ndarray, label_data: Dict) -> Tuple[np.ndarray, Dict]:
|
|
|
+ if random.random() > self.probability:
|
|
|
+ return image, label_data
|
|
|
+ value = random.randint(-self.delta, self.delta)
|
|
|
+ hsv = cv2.cvtColor(image, cv2.COLOR_BGR2HSV)
|
|
|
+ hsv[:, :, 2] = np.clip(hsv[:, :, 2] + value, 0, 255)
|
|
|
+ return cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR), label_data
|
|
|
+
|
|
|
+
|
|
|
+class RandomBlur:
|
|
|
+ def __init__(self, probability: float = 0.5, ksize: Tuple[int, int] = (5, 5)):
|
|
|
+ self.probability = probability
|
|
|
+ self.ksize = ksize
|
|
|
+
|
|
|
+ def __call__(self, image: np.ndarray, label_data: Dict) -> Tuple[np.ndarray, Dict]:
|
|
|
+ if random.random() > self.probability:
|
|
|
+ return image, label_data
|
|
|
+ return cv2.GaussianBlur(image, self.ksize, 0), label_data
|
|
|
+
|
|
|
+
|
|
|
+class RandomNoise:
|
|
|
+ def __init__(self, probability: float = 0.5, noise_level: int = 25):
|
|
|
+ self.probability = probability
|
|
|
+ self.noise_level = noise_level
|
|
|
+
|
|
|
+ def __call__(self, image: np.ndarray, label_data: Dict) -> Tuple[np.ndarray, Dict]:
|
|
|
+ if random.random() > self.probability:
|
|
|
+ return image, label_data
|
|
|
+ noise = np.random.randint(-self.noise_level, self.noise_level, image.shape, dtype=np.int16)
|
|
|
+ noisy_image = np.clip(image.astype(np.int16) + noise, 0, 255).astype(np.uint8)
|
|
|
+ return noisy_image, label_data
|
|
|
+
|
|
|
+
|
|
|
+class RandomScale:
|
|
|
+ def __init__(self, probability: float = 0.5, scale_range: Tuple[float, float] = (0.8, 1.2)):
|
|
|
+ self.probability = probability
|
|
|
+ self.scale_range = scale_range
|
|
|
+
|
|
|
+ def __call__(self, image: np.ndarray, label_data: Dict) -> Tuple[np.ndarray, Dict]:
|
|
|
+ if random.random() > self.probability:
|
|
|
+ return image, label_data
|
|
|
+
|
|
|
+ h, w = image.shape[:2]
|
|
|
+ scale = random.uniform(*self.scale_range)
|
|
|
+ new_w, new_h = int(w * scale), int(h * scale)
|
|
|
+ scaled_image = cv2.resize(image, (new_w, new_h))
|
|
|
+
|
|
|
+ new_shapes = []
|
|
|
+ for shape in label_data['shapes']:
|
|
|
+ new_shape = copy.deepcopy(shape)
|
|
|
+ new_points = [[x * scale, y * scale] for x, y in shape['points']]
|
|
|
+ new_shape['points'] = new_points
|
|
|
+ new_shapes.append(new_shape)
|
|
|
+
|
|
|
+ label_data['shapes'] = new_shapes
|
|
|
+ label_data['imageHeight'] = new_h
|
|
|
+ label_data['imageWidth'] = new_w
|
|
|
+ return scaled_image, label_data
|
|
|
+
|
|
|
+
|
|
|
+class RandomColorJitter:
|
|
|
+ def __init__(self, probability: float = 0.5, alpha: float = 0.3):
|
|
|
+ self.probability = probability
|
|
|
+ self.alpha = alpha
|
|
|
+
|
|
|
+ def __call__(self, image: np.ndarray, label_data: Dict) -> Tuple[np.ndarray, Dict]:
|
|
|
+ if random.random() > self.probability:
|
|
|
+ return image, label_data
|
|
|
+ factors = np.random.uniform(1 - self.alpha, 1 + self.alpha, 3)
|
|
|
+ img = image.astype(np.float32)
|
|
|
+ img = np.clip(img * factors, 0, 255).astype(np.uint8)
|
|
|
+ return img, label_data
|
|
|
+
|
|
|
+
|
|
|
+class RandomRotate:
|
|
|
+ def __init__(self, probability: float = 0.7):
|
|
|
+ self.probability = probability
|
|
|
+
|
|
|
+ def __call__(self, image: np.ndarray, label_data: Dict) -> Tuple[np.ndarray, Dict]:
|
|
|
+ if random.random() > self.probability:
|
|
|
+ return image, label_data
|
|
|
+
|
|
|
+ h, w = image.shape[:2]
|
|
|
+ angle = random.uniform(-30, 30)
|
|
|
+ M = cv2.getRotationMatrix2D((w / 2, h / 2), angle, 1.0)
|
|
|
+ rotated = cv2.warpAffine(image, M, (w, h), flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_CONSTANT)
|
|
|
+
|
|
|
+ new_shapes = []
|
|
|
+ for shape in label_data['shapes']:
|
|
|
+ new_shape = copy.deepcopy(shape)
|
|
|
+ new_points = []
|
|
|
+ for x, y in shape['points']:
|
|
|
+ x_new = M[0][0] * x + M[0][1] * y + M[0][2]
|
|
|
+ y_new = M[1][0] * x + M[1][1] * y + M[1][2]
|
|
|
+ new_points.append([x_new, y_new])
|
|
|
+ new_shape['points'] = new_points
|
|
|
+ new_shapes.append(new_shape)
|
|
|
+
|
|
|
+ label_data['shapes'] = new_shapes
|
|
|
+ return rotated, label_data
|
|
|
+
|
|
|
+
|
|
|
+class RandomCrop:
|
|
|
+ def __init__(self, probability: float = 0.7):
|
|
|
+ self.probability = probability
|
|
|
+
|
|
|
+ def __call__(self, image: np.ndarray, label_data: Dict) -> Tuple[np.ndarray, Dict]:
|
|
|
+ if random.random() > self.probability:
|
|
|
+ return image, label_data
|
|
|
+
|
|
|
+ h, w = image.shape[:2]
|
|
|
+ crop_w = random.randint(int(w * 0.6), w)
|
|
|
+ crop_h = random.randint(int(h * 0.6), h)
|
|
|
+ x = random.randint(0, w - crop_w)
|
|
|
+ y = random.randint(0, h - crop_h)
|
|
|
+ cropped = image[y:y + crop_h, x:x + crop_w]
|
|
|
+
|
|
|
+ new_shapes = []
|
|
|
+ for shape in label_data['shapes']:
|
|
|
+ new_shape = copy.deepcopy(shape)
|
|
|
+ new_points = [[max(0, px - x), max(0, py - y)] for px, py in shape['points']]
|
|
|
+ new_shape['points'] = new_points
|
|
|
+ new_shapes.append(new_shape)
|
|
|
+
|
|
|
+ label_data['shapes'] = new_shapes
|
|
|
+ label_data['imageHeight'] = crop_h
|
|
|
+ label_data['imageWidth'] = crop_w
|
|
|
+ return cropped, label_data
|
|
|
+
|
|
|
+
|
|
|
+class RandomOcclusion:
|
|
|
+ def __init__(self, probability: float = 0.7):
|
|
|
+ self.probability = probability
|
|
|
+
|
|
|
+ def __call__(self, image: np.ndarray, label_data: Dict) -> Tuple[np.ndarray, Dict]:
|
|
|
+ if random.random() > self.probability:
|
|
|
+ return image, label_data
|
|
|
+
|
|
|
+ h, w = image.shape[:2]
|
|
|
+ for _ in range(random.randint(1, 3)):
|
|
|
+ ow = random.randint(80, 150)
|
|
|
+ oh = random.randint(80, 150)
|
|
|
+ x = random.randint(0, w - ow)
|
|
|
+ y = random.randint(0, h - oh)
|
|
|
+ cv2.rectangle(image, (x, y), (x + ow, y + oh), (0, 0, 0), -1)
|
|
|
+ return image, label_data
|
|
|
+
|
|
|
+
|
|
|
+class AdjustBrightnessContrast:
|
|
|
+ def __init__(self, brightness=0, contrast=1.0):
|
|
|
+ self.brightness = brightness
|
|
|
+ self.contrast = contrast
|
|
|
+
|
|
|
+ def __call__(self, image: np.ndarray, label_data: Dict) -> Tuple[np.ndarray, Dict]:
|
|
|
+ return cv2.convertScaleAbs(image, alpha=self.contrast, beta=self.brightness), label_data
|
|
|
+
|
|
|
+
|
|
|
+class SimulateShadow:
|
|
|
+ def __init__(self, shadow_intensity=0.5):
|
|
|
+ self.shadow_intensity = shadow_intensity
|
|
|
+
|
|
|
+ def __call__(self, image: np.ndarray, label_data: Dict) -> Tuple[np.ndarray, Dict]:
|
|
|
+ h = image.shape[0]
|
|
|
+ shadow = np.zeros_like(image)
|
|
|
+ for i in range(h):
|
|
|
+ shadow[i] = int(255 * self.shadow_intensity * (1 - i / h))
|
|
|
+ return cv2.addWeighted(image, 1, shadow, 0.5, 0), label_data
|
|
|
+
|
|
|
+
|
|
|
+class SimulateHighlight:
|
|
|
+ def __init__(self, highlight_intensity=0.5):
|
|
|
+ self.highlight_intensity = highlight_intensity
|
|
|
+
|
|
|
+ def __call__(self, image: np.ndarray, label_data: Dict) -> Tuple[np.ndarray, Dict]:
|
|
|
+ h = image.shape[0]
|
|
|
+ highlight = np.zeros_like(image)
|
|
|
+ for i in range(h):
|
|
|
+ highlight[i] = int(255 * self.highlight_intensity * (i / h))
|
|
|
+ return cv2.addWeighted(image, 1, highlight, 0.5, 0), label_data
|
|
|
+
|
|
|
+
|
|
|
+# ------------------------ 工具函数 ------------------------
|
|
|
+
|
|
|
+def build_transforms(config: Dict) -> List[Tuple[str, Any]]:
|
|
|
+ transforms = []
|
|
|
+ if config.get('rotate'): transforms.append(('rotate', RandomRotate()))
|
|
|
+ if config.get('crop'): transforms.append(('crop', RandomCrop()))
|
|
|
+ if config.get('occlude'): transforms.append(('occlude', RandomOcclusion()))
|
|
|
+ if config.get('flip'): transforms.append(('flip', RandomFlip()))
|
|
|
+ if config.get('brightness'): transforms.append(('brightness', RandomBrightness()))
|
|
|
+ if config.get('blur'): transforms.append(('blur', RandomBlur()))
|
|
|
+ if config.get('noise'): transforms.append(('noise', RandomNoise()))
|
|
|
+ if config.get('scale'): transforms.append(('scale', RandomScale()))
|
|
|
+ if config.get('color_jitter'): transforms.append(('color_jitter', RandomColorJitter()))
|
|
|
+ if config.get('adjust_brightness_contrast'):
|
|
|
+ transforms.append(('adjust_brightness_contrast', AdjustBrightnessContrast(brightness=30, contrast=1.5)))
|
|
|
+ if config.get('shadow'): transforms.append(('shadow', SimulateShadow(shadow_intensity=0.3)))
|
|
|
+ if config.get('highlight'): transforms.append(('highlight', SimulateHighlight(highlight_intensity=0.3)))
|
|
|
+ random.shuffle(transforms)
|
|
|
+ return transforms
|
|
|
+
|
|
|
+
|
|
|
+def process_file_separately(image_path: str, label_path: str, output_dir: str, transforms: List[Tuple[str, Any]]):
|
|
|
+ image = cv2.imread(image_path)
|
|
|
+ if image is None:
|
|
|
+ print(f"无法读取图像: {image_path}")
|
|
|
+ return
|
|
|
+
|
|
|
+ with open(label_path, 'r') as f:
|
|
|
+ label_data = json.load(f)
|
|
|
+
|
|
|
+ base_name = os.path.splitext(os.path.basename(image_path))[0]
|
|
|
+ for transform_name, transform in transforms:
|
|
|
+ transformed_image, transformed_label = transform(image.copy(), copy.deepcopy(label_data))
|
|
|
+ transformed_label['imageHeight'] = transformed_image.shape[0]
|
|
|
+ transformed_label['imageWidth'] = transformed_image.shape[1]
|
|
|
+
|
|
|
+ output_img = os.path.join(output_dir, f"{base_name}_{transform_name}.jpg")
|
|
|
+ output_json = os.path.join(output_dir, f"{base_name}_{transform_name}.json")
|
|
|
+ cv2.imwrite(output_img, transformed_image)
|
|
|
+ with open(output_json, 'w') as f:
|
|
|
+ json.dump(transformed_label, f, indent=2)
|
|
|
+
|
|
|
+
|
|
|
+def process_dataset(input_dir: str, output_dir: str, config: Dict):
|
|
|
+ os.makedirs(output_dir, exist_ok=True)
|
|
|
+ transforms = build_transforms(config)
|
|
|
+ count = 0
|
|
|
+
|
|
|
+ for fname in os.listdir(input_dir):
|
|
|
+ if fname.lower().endswith(('.jpg', '.jpeg', '.png')):
|
|
|
+ image_path = os.path.join(input_dir, fname)
|
|
|
+ label_path = os.path.join(input_dir, os.path.splitext(fname)[0] + '.json')
|
|
|
+
|
|
|
+ if os.path.exists(label_path):
|
|
|
+ shutil.copy(image_path, os.path.join(output_dir, fname))
|
|
|
+ shutil.copy(label_path, os.path.join(output_dir, os.path.basename(label_path)))
|
|
|
+ process_file_separately(image_path, label_path, output_dir, transforms)
|
|
|
+ count += 1
|
|
|
+ else:
|
|
|
+ print(f"跳过 {fname},缺少对应 JSON 文件")
|
|
|
+
|
|
|
+ print(f"\n处理完成,共处理 {count} 个样本")
|
|
|
+ print(f"输出路径:{os.path.abspath(output_dir)}")
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == "__main__":
|
|
|
+ input_dir = r"G:\python_ws_g\data\total"
|
|
|
+ # input_dir = r"G:\python_ws_g\data\total"
|
|
|
+ parent_dir = os.path.dirname(input_dir)
|
|
|
+ output_dir = os.path.join(parent_dir, "a_strong_output")
|
|
|
+ process_dataset(input_dir, output_dir, config)
|