123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494 |
- import math
- import os.path
- import re
- import sys
- import PIL.Image
- import torch
- import numpy as np
- import matplotlib.pyplot as plt
- import torchvision.transforms
- import torchvision.transforms.functional as F
- from torch.utils.data import DataLoader
- from torchvision.transforms import v2
- from torchvision.utils import make_grid, draw_bounding_boxes
- from torchvision.io import read_image
- from pathlib import Path
- from torchvision.models.detection import maskrcnn_resnet50_fpn_v2, MaskRCNN_ResNet50_FPN_V2_Weights
- # PyTorch TensorBoard support
- from torch.utils.tensorboard import SummaryWriter
- import cv2
- from sklearn.cluster import DBSCAN
- from test.MaskRCNN import MaskRCNNDataset
- from tools import utils
- import pandas as pd
- plt.rcParams["savefig.bbox"] = 'tight'
- orig_path = r'F:\Downloads\severstal-steel-defect-detection'
- dst_path = r'F:\Downloads\severstal-steel-defect-detection'
- def show(imgs):
- if not isinstance(imgs, list):
- imgs = [imgs]
- fig, axs = plt.subplots(ncols=len(imgs), squeeze=False)
- for i, img in enumerate(imgs):
- img = img.detach()
- img = F.to_pil_image(img)
- axs[0, i].imshow(np.asarray(img))
- axs[0, i].set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])
- plt.show()
- def train_one_epoch(model, optimizer, data_loader, device, epoch, print_freq, scaler=None):
- model.train()
- metric_logger = utils.MetricLogger(delimiter=" ")
- metric_logger.add_meter("lr", utils.SmoothedValue(window_size=1, fmt="{value:.6f}"))
- header = f"Epoch: [{epoch}]"
- lr_scheduler = None
- if epoch == 0:
- warmup_factor = 1.0 / 1000
- warmup_iters = min(1000, len(data_loader) - 1)
- lr_scheduler = torch.optim.lr_scheduler.LinearLR(
- optimizer, start_factor=warmup_factor, total_iters=warmup_iters
- )
- for images, targets in metric_logger.log_every(data_loader, print_freq, header):
- images = list(image.to(device) for image in images)
- targets = [{k: v.to(device) if isinstance(v, torch.Tensor) else v for k, v in t.items()} for t in targets]
- with torch.cuda.amp.autocast(enabled=scaler is not None):
- loss_dict = model(images, targets)
- losses = sum(loss for loss in loss_dict.values())
- # reduce losses over all GPUs for logging purposes
- loss_dict_reduced = utils.reduce_dict(loss_dict)
- losses_reduced = sum(loss for loss in loss_dict_reduced.values())
- loss_value = losses_reduced.item()
- if not math.isfinite(loss_value):
- print(f"Loss is {loss_value}, stopping training")
- print(loss_dict_reduced)
- sys.exit(1)
- optimizer.zero_grad()
- if scaler is not None:
- scaler.scale(losses).backward()
- scaler.step(optimizer)
- scaler.update()
- else:
- losses.backward()
- optimizer.step()
- if lr_scheduler is not None:
- lr_scheduler.step()
- metric_logger.update(loss=losses_reduced, **loss_dict_reduced)
- metric_logger.update(lr=optimizer.param_groups[0]["lr"])
- return metric_logger
- def train():
- pass
- def trans_datasets_format():
- # 使用pandas的read_csv函数读取文件
- df = pd.read_csv(os.path.join(orig_path, 'train.csv'))
- # 显示数据的前几行
- print(df.head())
- for row in df.itertuples():
- # print(f"Row index: {row.Index}")
- # print(getattr(row, 'ImageId')) # 输出特定列的值
- img_name = getattr(row, 'ImageId')
- img_path = os.path.join(orig_path + '/train_images', img_name)
- dst_img_path = os.path.join(dst_path + '/images/train', img_name)
- dst_label_path = os.path.join(dst_path + '/labels/train', img_name[:-3] + 'txt')
- print(f'dst label:{dst_label_path}')
- im = cv2.imread(img_path)
- # cv2.imshow('test',im)
- cv2.imwrite(dst_img_path, im)
- img = PIL.Image.open(img_path)
- height, width = im.shape[:2]
- print(f'cv2 size:{im.shape}')
- label, mask = compute_mask(row, img.size)
- lbls, ins_masks=cluster_dbscan(mask,img)
- with open(dst_label_path, 'a+') as writer:
- # writer.write(label)
- for ins_mask in ins_masks:
- lbl_data = str(label) + ' '
- for mp in ins_mask:
- h,w=mp
- lbl_data += str(w / width) + ' ' + str(h / height) + ' '
- # non_zero_coords = np.nonzero(inm.reshape(width,height).T)
- # coords_list = list(zip(non_zero_coords[0], non_zero_coords[1]))
- # # print(f'mask:{mask[0,333]}')
- # print(f'mask pixels:{coords_list}')
- #
- #
- # for coord in coords_list:
- # h, w = coord
- # lbl_data += str(w / width) + ' ' + str(h / height) + ' '
- writer.write(lbl_data + '\n')
- print(f'lbl_data:{lbl_data}')
- writer.close()
- print(f'label:{label}')
- # plt.imshow(img)
- # plt.imshow(mask, cmap='Reds', alpha=0.3)
- # plt.show()
- def compute_mask(row, shape):
- width, height = shape
- print(f'shape:{shape}')
- mask = np.zeros(width * height, dtype=np.uint8)
- pixels = np.array(list(map(int, row.EncodedPixels.split())))
- label = row.ClassId
- # print(f'pixels:{pixels}')
- mask_start = pixels[0::2]
- mask_length = pixels[1::2]
- for s, l in zip(mask_start, mask_length):
- mask[s:s + l] = 255
- mask = mask.reshape((width, height)).T
- # mask = np.flipud(np.rot90(mask.reshape((height, width))))
- return label, mask
- def cluster_dbscan(mask,image):
- # 将 mask 转换为二值图像
- _, mask_binary = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)
- # 将 mask 一维化
- mask_flattened = mask_binary.flatten()
- # 获取 mask 中的前景像素坐标
- foreground_pixels = np.argwhere(mask_flattened == 255)
- # 将像素坐标转换为二维坐标
- foreground_pixels_2d = np.column_stack(
- (foreground_pixels // mask_binary.shape[1], foreground_pixels % mask_binary.shape[1]))
- # 定义 DBSCAN 参数
- eps = 3 # 邻域半径
- min_samples = 10 # 最少样本数量
- # 应用 DBSCAN
- dbscan = DBSCAN(eps=eps, min_samples=min_samples).fit(foreground_pixels_2d)
- # 获取聚类标签
- labels = dbscan.labels_
- print(f'labels:{labels}')
- # 获取唯一的标签
- unique_labels = set(labels)
- print(f'unique_labels:{unique_labels}')
- # 创建一个空的图像来保存聚类结果
- clustered_image = np.zeros_like(image)
- # print(f'clustered_image shape:{clustered_image.shape}')
- # 将每个像素分配给相应的簇
- clustered_points=[]
- for k in unique_labels:
- class_member_mask = (labels == k)
- # print(f'class_member_mask:{class_member_mask}')
- # plt.subplot(132), plt.imshow(class_member_mask), plt.title(str(labels))
- pixel_indices = foreground_pixels_2d[class_member_mask]
- clustered_points.append(pixel_indices)
- return unique_labels,clustered_points
- def show_cluster_dbscan(mask,image,unique_labels,clustered_points,):
- print(f'mask shape:{mask.shape}')
- # 将 mask 转换为二值图像
- _, mask_binary = cv2.threshold(mask, 127, 255, cv2.THRESH_BINARY)
- # 将 mask 一维化
- mask_flattened = mask_binary.flatten()
- # 获取 mask 中的前景像素坐标
- foreground_pixels = np.argwhere(mask_flattened == 255)
- # print(f'unique_labels:{unique_labels}')
- # 创建一个空的图像来保存聚类结果
- print(f'image shape:{image.shape}')
- clustered_image = np.zeros_like(image)
- print(f'clustered_image shape:{clustered_image.shape}')
- # 为每个簇分配颜色
- colors =np.array( [plt.cm.Spectral(each) for each in np.linspace(0, 1, len(unique_labels))])
- # print(f'colors:{colors}')
- plt.figure(figsize=(12, 6))
- for points_coord,col in zip(clustered_points,colors):
- for coord in points_coord:
- clustered_image[coord[0], coord[1]] = (np.array(col[:3]) * 255)
- # # 将每个像素分配给相应的簇
- # for k, col in zip(unique_labels, colors):
- # print(f'col:{col*255}')
- # if k == -1:
- # # 黑色用于噪声点
- # col = [0, 0, 0, 1]
- #
- # class_member_mask = (labels == k)
- # # print(f'class_member_mask:{class_member_mask}')
- # # plt.subplot(132), plt.imshow(class_member_mask), plt.title(str(labels))
- #
- # pixel_indices = foreground_pixels_2d[class_member_mask]
- # clustered_points.append(pixel_indices)
- # # print(f'pixel_indices:{pixel_indices}')
- # for pixel_index in pixel_indices:
- # clustered_image[pixel_index[0], pixel_index[1]] = (np.array(col[:3]) * 255)
- print(f'clustered_points:{len(clustered_points)}')
- # print(f'clustered_image:{clustered_image}')
- # 显示原图和聚类结果
- # plt.figure(figsize=(12, 6))
- plt.subplot(131), plt.imshow(image), plt.title('Original Image')
- # print(f'image:{image}')
- plt.subplot(132), plt.imshow(mask_binary, cmap='gray'), plt.title('Mask')
- plt.subplot(133), plt.imshow(clustered_image.astype(np.uint8)), plt.title('Clustered Image')
- plt.show()
- def test():
- dog1_int = read_image(str(Path('./assets') / 'dog1.jpg'))
- dog2_int = read_image(str(Path('./assets') / 'dog2.jpg'))
- dog_list = [dog1_int, dog2_int]
- grid = make_grid(dog_list)
- weights = MaskRCNN_ResNet50_FPN_V2_Weights.DEFAULT
- transforms = weights.transforms()
- images = [transforms(d) for d in dog_list]
- # 假设输入图像的尺寸为 (3, 800, 800)
- dummy_input = torch.randn(1, 3, 800, 800)
- model = maskrcnn_resnet50_fpn_v2(weights=weights, progress=False)
- model = model.eval()
- # 使用 torch.jit.script
- scripted_model = torch.jit.script(model)
- output = model(dummy_input)
- print(f'output:{output}')
- writer = SummaryWriter('runs/')
- writer.add_graph(scripted_model, input_to_model=dummy_input)
- writer.flush()
- # torch.onnx.export(models,images, f='maskrcnn.onnx') # 导出 .onnx 文
- # netron.start('AlexNet.onnx') # 展示结构图
- show(grid)
- def test_mask():
- name = 'fdb7c0397'
- label_path = os.path.join(dst_path + '/labels/train', name + '.txt')
- img_path = os.path.join(orig_path + '/train_images', name + '.jpg')
- mask = np.zeros((256, 1600), dtype=np.uint8)
- df = pd.read_csv(os.path.join(orig_path, 'train.csv'))
- # 显示数据的前几行
- print(df.head())
- points = []
- with open(label_path, 'r') as reader:
- lines = reader.readlines()
- for line in lines:
- parts = line.strip().split()
- # print(f'parts:{parts}')
- class_id = int(parts[0])
- x_array = parts[1::2]
- y_array = parts[2::2]
- for x, y in zip(x_array, y_array):
- x = float(x)
- y = float(y)
- points.append((int(y * 255), int(x * 1600)))
- # points = np.array([[float(parts[i]), float(parts[i + 1])] for i in range(1, len(parts), 2)])
- # mask_resized = cv2.resize(points, (1600, 256), interpolation=cv2.INTER_NEAREST)
- print(f'points:{points}')
- # mask[points[:,0],points[:,1]]=255
- for p in points:
- mask[p] = 255
- # cv2.fillPoly(mask, points, color=(255,))
- cv2.imshow('mask', mask)
- for row in df.itertuples():
- img_name = name + '.jpg'
- if img_name == getattr(row, 'ImageId'):
- img = PIL.Image.open(img_path)
- height, width = img.size
- print(f'img size:{img.size}')
- label, mask = compute_mask(row, img.size)
- plt.imshow(img)
- plt.imshow(mask, cmap='Reds', alpha=0.3)
- plt.show()
- cv2.waitKey(0)
- def show_img_mask(img_path):
- test_img = PIL.Image.open(img_path)
- w,h=test_img.size
- test_img=torchvision.transforms.ToTensor()(test_img)
- test_img=test_img.permute(1, 2, 0)
- print(f'test_img shape:{test_img.shape}')
- lbl_path=re.sub(r'\\images\\', r'\\labels\\', img_path[:-3]) + 'txt'
- # print(f'lbl_path:{lbl_path}')
- masks = []
- labels = []
- with open(lbl_path, 'r') as reader:
- lines = reader.readlines()
- # 为每个簇分配颜色
- colors = np.array([plt.cm.Spectral(each) for each in np.linspace(0, 1, len(lines))])
- print(f'colors:{colors*255}')
- mask_points = []
- for line ,col in zip(lines,colors):
- print(f'col:{np.array(col[:3]) * 255}')
- mask = torch.zeros(test_img.shape, dtype=torch.uint8)
- # print(f'mask shape:{mask.shape}')
- parts = line.strip().split()
- # print(f'parts:{parts}')
- cls = torch.tensor(int(parts[0]), dtype=torch.int64)
- labels.append(cls)
- x_array = parts[1::2]
- y_array = parts[2::2]
- for x, y in zip(x_array, y_array):
- x = float(x)
- y = float(y)
- mask_points.append((int(y * h), int(x * w)))
- for p in mask_points:
- # print(f'p:{p}')
- mask[p] = torch.tensor(np.array(col[:3])*255)
- masks.append(mask)
- reader.close()
- target = {}
- # target["boxes"] = masks_to_boxes(torch.stack(masks))
- # target["labels"] = torch.stack(labels)
- target["masks"] = torch.stack(masks)
- print(f'target:{target}')
- # plt.imshow(test_img.permute(1, 2, 0))
- fig, axs = plt.subplots(2, 1)
- print(f'test_img:{test_img*255}')
- axs[0].imshow(test_img)
- axs[0].axis('off')
- axs[1].axis('off')
- axs[1].imshow(test_img*255)
- for img_mask in target['masks']:
- # img_mask=img_mask.unsqueeze(0)
- # img_mask = img_mask.expand_as(test_img)
- # print(f'img_mask:{img_mask.shape}')
- axs[1].imshow(img_mask,alpha=0.3)
- # img_mask=np.array(img_mask)
- # print(f'img_mask:{img_mask.shape}')
- # plt.imshow(img_mask,alpha=0.5)
- # mask_3channel = cv2.merge([np.zeros_like(img_mask), np.zeros_like(img_mask), img_mask])
- # masked_image = cv2.addWeighted(test_img, 1, mask_3channel, 0.6, 0)
- # cv2.imshow('cv2 mask img', masked_image)
- # cv2.waitKey(0)
- plt.show()
- def show_dataset():
- global transforms, dataset, imgs
- transforms = v2.Compose([
- # v2.RandomResizedCrop(size=(224, 224), antialias=True),
- # v2.RandomPhotometricDistort(p=1),
- # v2.RandomHorizontalFlip(p=1),
- v2.ToTensor()
- ])
- dataset = MaskRCNNDataset(dataset_path=r'F:\Downloads\severstal-steel-defect-detection', transforms=transforms,
- dataset_type='train')
- dataloader = DataLoader(dataset, batch_size=4, shuffle=False, collate_fn=utils.collate_fn)
- imgs, targets = next(iter(dataloader))
- mask = np.array(targets[2]['masks'][0])
- boxes = targets[2]['boxes']
- print(f'boxes:{boxes}')
- # mask[mask == 255] = 1
- img = np.array(imgs[2].permute(1, 2, 0)) * 255
- img = img.astype(np.uint8)
- print(f'img shape:{img.shape}')
- print(f'mask:{mask.shape}')
- # print(f'target:{targets}')
- # print(f'imgs:{imgs[0]}')
- # print(f'cv2 img shape:{np.array(imgs[0]).shape}')
- # cv2.imshow('cv2 img',img)
- # cv2.imshow('cv2 mask', mask)
- # plt.imshow('mask',mask)
- mask_3channel = cv2.merge([np.zeros_like(mask), np.zeros_like(mask), mask])
- # cv2.imshow('mask_3channel',mask_3channel)
- print(f'mask_3channel:{mask_3channel.shape}')
- masked_image = cv2.addWeighted(img, 1, mask_3channel, 0.6, 0)
- # cv2.imshow('cv2 mask img', masked_image)
- plt.imshow(imgs[0].permute(1, 2, 0))
- plt.imshow(mask, cmap='Reds', alpha=0.3)
- drawn_boxes = draw_bounding_boxes((imgs[2] * 255).to(torch.uint8), boxes, colors="red", width=5)
- plt.imshow(drawn_boxes.permute(1, 2, 0))
- # show(drawn_boxes)
- plt.show()
- cv2.waitKey(0)
- def test_cluster(img_path):
- test_img = PIL.Image.open(img_path)
- w, h = test_img.size
- test_img = torchvision.transforms.ToTensor()(test_img)
- test_img=(test_img.permute(1, 2, 0).numpy() * 255).astype(np.uint8)
- # print(f'test_img:{test_img}')
- lbl_path = re.sub(r'\\images\\', r'\\labels\\', img_path[:-3]) + 'txt'
- # print(f'lbl_path:{lbl_path}')
- masks = []
- labels = []
- with open(lbl_path, 'r') as reader:
- lines = reader.readlines()
- mask_points = []
- for line in lines:
- mask = torch.zeros((h, w), dtype=torch.uint8)
- parts = line.strip().split()
- # print(f'parts:{parts}')
- cls = torch.tensor(int(parts[0]), dtype=torch.int64)
- labels.append(cls)
- x_array = parts[1::2]
- y_array = parts[2::2]
- for x, y in zip(x_array, y_array):
- x = float(x)
- y = float(y)
- mask_points.append((int(y * h), int(x * w)))
- for p in mask_points:
- mask[p] = 255
- masks.append(mask)
- # print(f'masks:{masks}')
- labels,clustered_points=cluster_dbscan(masks[0].numpy(),test_img)
- print(f'labels:{labels}')
- print(f'clustered_points len:{len(clustered_points)}')
- show_cluster_dbscan(masks[0].numpy(),test_img,labels,clustered_points)
- if __name__ == '__main__':
- # trans_datasets_format()
- # test_mask()
- # 定义转换
- # show_dataset()
- # test_img_path= r"F:\Downloads\severstal-steel-defect-detection\images\train\0025bde0c.jpg"
- test_img_path = r"F:\DevTools\datasets\renyaun\1012\spilt\images\train\2024-09-27-14-32-53_SaveImage.png"
- # test_img1_path=r"F:\Downloads\severstal-steel-defect-detection\images\train\1d00226a0.jpg"
- show_img_mask(test_img_path)
- #
- # test_cluster(test_img_path)
|