123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221 |
- # 并行计算
- import time
- import skimage
- from models.line_detect.postprocess import show_predict
- import os
- import torch
- from PIL import Image
- import matplotlib.pyplot as plt
- import matplotlib as mpl
- import numpy as np
- from models.line_detect.line_net import linenet_resnet50_fpn
- from torchvision import transforms
- # from models.wirenet.postprocess import postprocess
- from models.wirenet.postprocess import postprocess
- from rtree import index
- device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
- import multiprocessing as mp
- def process_box(box, lines, scores, idx):
- line_ = []
- score_ = []
- for i in box:
- score_max = 0.0
- tmp = [[0.0, 0.0], [0.0, 0.0]]
- # 获取与当前box可能相交的所有线段
- possible_matches = list(idx.intersection((i[0], i[1], i[2], i[3])))
- for j in possible_matches:
- line_j = lines[0, j].cpu().numpy() / 128 * 512
- if (line_j[0][0] >= i[0] and line_j[1][0] >= i[0] and
- line_j[0][0] <= i[2] and line_j[1][0] <= i[2] and
- line_j[0][1] >= i[1] and line_j[1][1] >= i[1] and
- line_j[0][1] <= i[3] and line_j[1][1] <= i[3]):
- if scores[j] > score_max:
- tmp = line_j
- score_max = scores[j]
- line_.append(tmp)
- score_.append(score_max)
- return torch.tensor(line_), torch.tensor(score_)
- def box_line_optimized1(pred):
- # 创建R-tree索引
- idx = index.Index()
- # 将所有线段添加到R-tree中
- lines = pred[-1]['wires']['lines'] # 形状为[1, 2500, 2, 2]
- scores = pred[-1]['wires']['score'][0] # 假设形状为[2500]
- for idx_line in range(lines.shape[1]): # 遍历2500条线段
- line_tensor = lines[0, idx_line].cpu().numpy() / 128 * 512 # 转换为numpy数组并调整比例
- x_min = float(min(line_tensor[0][0], line_tensor[1][0]))
- y_min = float(min(line_tensor[0][1], line_tensor[1][1]))
- x_max = float(max(line_tensor[0][0], line_tensor[1][0]))
- y_max = float(max(line_tensor[0][1], line_tensor[1][1]))
- idx.insert(idx_line, (max(0, x_min - 256), max(0, y_min - 256), min(512, x_max + 256), min(512, y_max + 256)))
- # 准备要处理的数据
- data_to_process = []
- for box_ in pred[0:-1]:
- box = box_['boxes'].cpu().numpy() # 确保将张量转换为numpy数组
- data_to_process.append((box, lines, scores, idx))
- # 使用 Pool 创建进程池并行处理数据
- with mp.Pool(processes=mp.cpu_count()) as pool: # 根据 CPU 核心数创建进程池
- results = pool.starmap(process_box, data_to_process)
- # 将结果放回原始 pred 中
- for idx_box, (processed_list, processed_s_list) in enumerate(results):
- pred[idx_box]['line'] = processed_list
- pred[idx_box]['line_score'] = processed_s_list
- return pred
- def load_best_model(model, save_path, device):
- if os.path.exists(save_path):
- checkpoint = torch.load(save_path, map_location=device)
- model.load_state_dict(checkpoint['model_state_dict'])
- # if optimizer is not None:
- # optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
- epoch = checkpoint['epoch']
- loss = checkpoint['loss']
- print(f"Loaded best model from {save_path} at epoch {epoch} with loss {loss:.4f}")
- else:
- print(f"No saved model found at {save_path}")
- return model
- def box_line_(pred):
- for idx, box_ in enumerate(pred[0:-1]):
- box = box_['boxes'] # 是一个tensor
- line = pred[-1]['wires']['lines'][idx].cpu().numpy() / 128 * 512
- score = pred[-1]['wires']['score'][idx]
- line_ = []
- score_ = []
- for i in box:
- score_max = 0.0
- tmp = [[0.0, 0.0], [0.0, 0.0]]
- for j in range(len(line)):
- if (line[j][0][1] >= i[0] and line[j][1][1] >= i[0] and
- line[j][0][1] <= i[2] and line[j][1][1] <= i[2] and
- line[j][0][0] >= i[1] and line[j][1][0] >= i[1] and
- line[j][0][0] <= i[3] and line[j][1][0] <= i[3]):
- if score[j] > score_max:
- tmp = line[j]
- score_max = score[j]
- line_.append(tmp)
- score_.append(score_max)
- processed_list = torch.tensor(line_)
- pred[idx]['line'] = processed_list
- processed_s_list = torch.tensor(score_)
- pred[idx]['line_score'] = processed_s_list
- return pred
- def box_line_optimized(pred):
- # 创建R-tree索引
- idx = index.Index()
- # 将所有线段添加到R-tree中
- lines = pred[-1]['wires']['lines'] # 形状为[1, 2500, 2, 2]
- scores = pred[-1]['wires']['score'][0] # 假设形状为[2500]
- # 提取并处理所有线段
- for idx_line in range(lines.shape[1]): # 遍历2500条线段
- line_tensor = lines[0, idx_line].cpu().numpy() / 128 * 512 # 转换为numpy数组并调整比例
- x_min = float(min(line_tensor[0][0], line_tensor[1][0]))
- y_min = float(min(line_tensor[0][1], line_tensor[1][1]))
- x_max = float(max(line_tensor[0][0], line_tensor[1][0]))
- y_max = float(max(line_tensor[0][1], line_tensor[1][1]))
- idx.insert(idx_line, (max(0, x_min - 256), max(0, y_min - 256), min(512, x_max + 256), min(512, y_max + 256)))
- for idx_box, box_ in enumerate(pred[0:-1]):
- box = box_['boxes'].cpu().numpy() # 确保将张量转换为numpy数组
- line_ = []
- score_ = []
- for i in box:
- score_max = 0.0
- tmp = [[0.0, 0.0], [0.0, 0.0]]
- # 获取与当前box可能相交的所有线段
- possible_matches = list(idx.intersection((i[0], i[1], i[2], i[3])))
- for j in possible_matches:
- line_j = lines[0, j].cpu().numpy() / 128 * 512
- if (line_j[0][1] >= i[0] and line_j[1][1] >= i[0] and # 注意这里交换了x和y
- line_j[0][1] <= i[2] and line_j[1][1] <= i[2] and
- line_j[0][0] >= i[1] and line_j[1][0] >= i[1] and
- line_j[0][0] <= i[3] and line_j[1][0] <= i[3]):
- if scores[j] > score_max:
- tmp = line_j
- score_max = scores[j]
- line_.append(tmp)
- score_.append(score_max)
- processed_list = torch.tensor(line_)
- pred[idx_box]['line'] = processed_list
- processed_s_list = torch.tensor(score_)
- pred[idx_box]['line_score'] = processed_s_list
- return pred
- def predict(pt_path, model, img):
- model = load_best_model(model, pt_path, device)
- model.eval()
- if isinstance(img, str):
- img = Image.open(img).convert("RGB")
- transform = transforms.ToTensor()
- img_tensor = transform(img) # [3, 512, 512]
- # 将图像调整为512x512大小
- im = img_tensor.permute(1, 2, 0) # [512, 512, 3]
- im_resized = skimage.transform.resize(im.cpu().numpy().astype(np.float32), (512, 512)) # (512, 512, 3)
- img_ = torch.tensor(im_resized).permute(2, 0, 1)
- with torch.no_grad():
- predictions = model([img_.to(device)])
- # print(predictions)
- pred = box_line_optimized1(predictions)
- # print(pred)
- # pred = box_line_(predictions)
- show_predict(img_, pred, t_start)
- if __name__ == '__main__':
- t_start = time.time()
- print(f'start to predict:{t_start}')
- model = linenet_resnet50_fpn().to(device)
- pt_path = r'D:\python\PycharmProjects\20250214\weight\resnet50_best_e100.pth'
- # img_path = f'D:\python\PycharmProjects\data2\images/train/2024-11-27-15-41-38_SaveImage.png' # 工件图
- # img_path = f'D:\python\PycharmProjects\data\images/train/00558656_3.png' # wireframe图
- img_path = r'C:\Users\m2337\Desktop\p\49.jpg'
- predict(pt_path, model, img_path)
- t_end = time.time()
- print(f'predict used:{t_end - t_start}')
|