from typing import Dict, List, Optional, Tuple import matplotlib.pyplot as plt import torch import torch.nn.functional as F import torchvision # from scipy.optimize import linear_sum_assignment from torch import nn, Tensor from libs.vision_libs.ops import boxes as box_ops, roi_align import libs.vision_libs.models.detection._utils as det_utils from collections import OrderedDict from models.line_detect.heads.head_losses import point_inference, compute_point_loss, line_iou_loss, \ lines_point_pair_loss, features_align, line_inference, compute_ins_loss, ins_inference, compute_circle_loss, \ circle_inference, arc_inference1 from utils.data_process.show_prams import print_params def fastrcnn_loss(class_logits, box_regression, labels, regression_targets): # type: (Tensor, Tensor, List[Tensor], List[Tensor]) -> Tuple[Tensor, Tensor] """ Computes the loss for Faster R-CNN. Args: class_logits (Tensor) box_regression (Tensor) labels (list[BoxList]) regression_targets (Tensor) Returns: classification_loss (Tensor) box_loss (Tensor) """ # print(f'compute fastrcnn_loss:{labels}') labels = torch.cat(labels, dim=0) regression_targets = torch.cat(regression_targets, dim=0) classification_loss = F.cross_entropy(class_logits, labels) # get indices that correspond to the regression targets for # the corresponding ground truth labels, to be used with # advanced indexing sampled_pos_inds_subset = torch.where(labels > 0)[0] labels_pos = labels[sampled_pos_inds_subset] N, num_classes = class_logits.shape box_regression = box_regression.reshape(N, box_regression.size(-1) // 4, 4) box_loss = F.smooth_l1_loss( box_regression[sampled_pos_inds_subset, labels_pos], regression_targets[sampled_pos_inds_subset], beta=1 / 9, reduction="sum", ) box_loss = box_loss / labels.numel() return classification_loss, box_loss def maskrcnn_inference(x, labels): # type: (Tensor, List[Tensor]) -> List[Tensor] """ From the results of the CNN, post process the masks by taking the ins corresponding to the class with max probability (which are of fixed size and directly output by the CNN) and return the masks in the ins field of the BoxList. Args: x (Tensor): the ins logits labels (list[BoxList]): bounding boxes that are used as reference, one for ech image Returns: results (list[BoxList]): one BoxList for each image, containing the extra field ins """ mask_prob = x.sigmoid() # select masks corresponding to the predicted classes num_masks = x.shape[0] boxes_per_image = [label.shape[0] for label in labels] labels = torch.cat(labels) index = torch.arange(num_masks, device=labels.device) mask_prob = mask_prob[index, labels][:, None] mask_prob = mask_prob.split(boxes_per_image, dim=0) return mask_prob def project_masks_on_boxes(gt_masks, boxes, matched_idxs, M): # type: (Tensor, Tensor, Tensor, int) -> Tensor """ Given segmentation masks and the bounding boxes corresponding to the location of the masks in the image, this function crops and resizes the masks in the position defined by the boxes. This prepares the masks for them to be fed to the loss computation as the targets. """ matched_idxs = matched_idxs.to(boxes) rois = torch.cat([matched_idxs[:, None], boxes], dim=1) gt_masks = gt_masks[:, None].to(rois) return roi_align(gt_masks, rois, (M, M), 1.0)[:, 0] def maskrcnn_loss(mask_logits, proposals, gt_masks, gt_labels, mask_matched_idxs): # type: (Tensor, List[Tensor], List[Tensor], List[Tensor], List[Tensor]) -> Tensor """ Args: proposals (list[BoxList]) mask_logits (Tensor) targets (list[BoxList]) Return: mask_loss (Tensor): scalar tensor containing the loss """ discretization_size = mask_logits.shape[-1] labels = [gt_label[idxs] for gt_label, idxs in zip(gt_labels, mask_matched_idxs)] mask_targets = [ project_masks_on_boxes(m, p, i, discretization_size) for m, p, i in zip(gt_masks, proposals, mask_matched_idxs) ] labels = torch.cat(labels, dim=0) mask_targets = torch.cat(mask_targets, dim=0) # torch.mean (in binary_cross_entropy_with_logits) doesn't # accept empty tensors, so handle it separately if mask_targets.numel() == 0: return mask_logits.sum() * 0 mask_loss = F.binary_cross_entropy_with_logits( mask_logits[torch.arange(labels.shape[0], device=labels.device), labels], mask_targets ) return mask_loss def keypoints_to_heatmap(keypoints, rois, heatmap_size): # type: (Tensor, Tensor, int) -> Tuple[Tensor, Tensor] offset_x = rois[:, 0] offset_y = rois[:, 1] scale_x = heatmap_size / (rois[:, 2] - rois[:, 0]) scale_y = heatmap_size / (rois[:, 3] - rois[:, 1]) offset_x = offset_x[:, None] offset_y = offset_y[:, None] scale_x = scale_x[:, None] scale_y = scale_y[:, None] x = keypoints[..., 0] y = keypoints[..., 1] x_boundary_inds = x == rois[:, 2][:, None] y_boundary_inds = y == rois[:, 3][:, None] x = (x - offset_x) * scale_x x = x.floor().long() y = (y - offset_y) * scale_y y = y.floor().long() x[x_boundary_inds] = heatmap_size - 1 y[y_boundary_inds] = heatmap_size - 1 valid_loc = (x >= 0) & (y >= 0) & (x < heatmap_size) & (y < heatmap_size) vis = keypoints[..., 2] > 0 valid = (valid_loc & vis).long() lin_ind = y * heatmap_size + x heatmaps = lin_ind * valid return heatmaps, valid def _onnx_heatmaps_to_keypoints( maps, maps_i, roi_map_width, roi_map_height, widths_i, heights_i, offset_x_i, offset_y_i ): num_keypoints = torch.scalar_tensor(maps.size(1), dtype=torch.int64) width_correction = widths_i / roi_map_width height_correction = heights_i / roi_map_height roi_map = F.interpolate( maps_i[:, None], size=(int(roi_map_height), int(roi_map_width)), mode="bicubic", align_corners=False )[:, 0] w = torch.scalar_tensor(roi_map.size(2), dtype=torch.int64) pos = roi_map.reshape(num_keypoints, -1).argmax(dim=1) x_int = pos % w y_int = (pos - x_int) // w x = (torch.tensor(0.5, dtype=torch.float32) + x_int.to(dtype=torch.float32)) * width_correction.to( dtype=torch.float32 ) y = (torch.tensor(0.5, dtype=torch.float32) + y_int.to(dtype=torch.float32)) * height_correction.to( dtype=torch.float32 ) xy_preds_i_0 = x + offset_x_i.to(dtype=torch.float32) xy_preds_i_1 = y + offset_y_i.to(dtype=torch.float32) xy_preds_i_2 = torch.ones(xy_preds_i_1.shape, dtype=torch.float32) xy_preds_i = torch.stack( [ xy_preds_i_0.to(dtype=torch.float32), xy_preds_i_1.to(dtype=torch.float32), xy_preds_i_2.to(dtype=torch.float32), ], 0, ) # TODO: simplify when indexing without rank will be supported by ONNX base = num_keypoints * num_keypoints + num_keypoints + 1 ind = torch.arange(num_keypoints) ind = ind.to(dtype=torch.int64) * base end_scores_i = ( roi_map.index_select(1, y_int.to(dtype=torch.int64)) .index_select(2, x_int.to(dtype=torch.int64)) .view(-1) .index_select(0, ind.to(dtype=torch.int64)) ) return xy_preds_i, end_scores_i @torch.jit._script_if_tracing def _onnx_heatmaps_to_keypoints_loop( maps, rois, widths_ceil, heights_ceil, widths, heights, offset_x, offset_y, num_keypoints ): xy_preds = torch.zeros((0, 3, int(num_keypoints)), dtype=torch.float32, device=maps.device) end_scores = torch.zeros((0, int(num_keypoints)), dtype=torch.float32, device=maps.device) for i in range(int(rois.size(0))): xy_preds_i, end_scores_i = _onnx_heatmaps_to_keypoints( maps, maps[i], widths_ceil[i], heights_ceil[i], widths[i], heights[i], offset_x[i], offset_y[i] ) xy_preds = torch.cat((xy_preds.to(dtype=torch.float32), xy_preds_i.unsqueeze(0).to(dtype=torch.float32)), 0) end_scores = torch.cat( (end_scores.to(dtype=torch.float32), end_scores_i.to(dtype=torch.float32).unsqueeze(0)), 0 ) return xy_preds, end_scores def heatmaps_to_keypoints(maps, rois): """Extract predicted keypoint locations from heatmaps. Output has shape (#rois, 4, #keypoints) with the 4 rows corresponding to (x, y, logit, prob) for each keypoint. """ # This function converts a discrete image coordinate in a HEATMAP_SIZE x # HEATMAP_SIZE image to a continuous keypoint coordinate. We maintain # consistency with keypoints_to_heatmap_labels by using the conversion from # Heckbert 1990: c = d + 0.5, where d is a discrete coordinate and c is a # continuous coordinate. offset_x = rois[:, 0] offset_y = rois[:, 1] widths = rois[:, 2] - rois[:, 0] heights = rois[:, 3] - rois[:, 1] widths = widths.clamp(min=1) heights = heights.clamp(min=1) widths_ceil = widths.ceil() heights_ceil = heights.ceil() num_keypoints = maps.shape[1] if torchvision._is_tracing(): xy_preds, end_scores = _onnx_heatmaps_to_keypoints_loop( maps, rois, widths_ceil, heights_ceil, widths, heights, offset_x, offset_y, torch.scalar_tensor(num_keypoints, dtype=torch.int64), ) return xy_preds.permute(0, 2, 1), end_scores xy_preds = torch.zeros((len(rois), 3, num_keypoints), dtype=torch.float32, device=maps.device) end_scores = torch.zeros((len(rois), num_keypoints), dtype=torch.float32, device=maps.device) for i in range(len(rois)): roi_map_width = int(widths_ceil[i].item()) roi_map_height = int(heights_ceil[i].item()) width_correction = widths[i] / roi_map_width height_correction = heights[i] / roi_map_height roi_map = F.interpolate( maps[i][:, None], size=(roi_map_height, roi_map_width), mode="bicubic", align_corners=False )[:, 0] # roi_map_probs = scores_to_probs(roi_map.copy()) w = roi_map.shape[2] pos = roi_map.reshape(num_keypoints, -1).argmax(dim=1) x_int = pos % w y_int = torch.div(pos - x_int, w, rounding_mode="floor") # assert (roi_map_probs[k, y_int, x_int] == # roi_map_probs[k, :, :].max()) x = (x_int.float() + 0.5) * width_correction y = (y_int.float() + 0.5) * height_correction xy_preds[i, 0, :] = x + offset_x[i] xy_preds[i, 1, :] = y + offset_y[i] xy_preds[i, 2, :] = 1 end_scores[i, :] = roi_map[torch.arange(num_keypoints, device=roi_map.device), y_int, x_int] return xy_preds.permute(0, 2, 1), end_scores def keypointrcnn_loss(keypoint_logits, proposals, gt_keypoints, keypoint_matched_idxs): # type: (Tensor, List[Tensor], List[Tensor], List[Tensor]) -> Tensor N, K, H, W = keypoint_logits.shape if H != W: raise ValueError( f"keypoint_logits height and width (last two elements of shape) should be equal. Instead got H = {H} and W = {W}" ) discretization_size = H heatmaps = [] valid = [] for proposals_per_image, gt_kp_in_image, midx in zip(proposals, gt_keypoints, keypoint_matched_idxs): kp = gt_kp_in_image[midx] heatmaps_per_image, valid_per_image = keypoints_to_heatmap(kp, proposals_per_image, discretization_size) heatmaps.append(heatmaps_per_image.view(-1)) valid.append(valid_per_image.view(-1)) keypoint_targets = torch.cat(heatmaps, dim=0) valid = torch.cat(valid, dim=0).to(dtype=torch.uint8) valid = torch.where(valid)[0] # torch.mean (in binary_cross_entropy_with_logits) doesn't # accept empty tensors, so handle it sepaartely if keypoint_targets.numel() == 0 or len(valid) == 0: return keypoint_logits.sum() * 0 keypoint_logits = keypoint_logits.view(N * K, H * W) keypoint_loss = F.cross_entropy(keypoint_logits[valid], keypoint_targets[valid]) return keypoint_loss def keypointrcnn_inference(x, boxes): # type: (Tensor, List[Tensor]) -> Tuple[List[Tensor], List[Tensor]] kp_probs = [] kp_scores = [] boxes_per_image = [box.size(0) for box in boxes] x2 = x.split(boxes_per_image, dim=0) for xx, bb in zip(x2, boxes): kp_prob, scores = heatmaps_to_keypoints(xx, bb) kp_probs.append(kp_prob) kp_scores.append(scores) return kp_probs, kp_scores def _onnx_expand_boxes(boxes, scale): # type: (Tensor, float) -> Tensor w_half = (boxes[:, 2] - boxes[:, 0]) * 0.5 h_half = (boxes[:, 3] - boxes[:, 1]) * 0.5 x_c = (boxes[:, 2] + boxes[:, 0]) * 0.5 y_c = (boxes[:, 3] + boxes[:, 1]) * 0.5 w_half = w_half.to(dtype=torch.float32) * scale h_half = h_half.to(dtype=torch.float32) * scale boxes_exp0 = x_c - w_half boxes_exp1 = y_c - h_half boxes_exp2 = x_c + w_half boxes_exp3 = y_c + h_half boxes_exp = torch.stack((boxes_exp0, boxes_exp1, boxes_exp2, boxes_exp3), 1) return boxes_exp # the next two functions should be merged inside Masker # but are kept here for the moment while we need them # temporarily for paste_mask_in_image def expand_boxes(boxes, scale): # type: (Tensor, float) -> Tensor if torchvision._is_tracing(): return _onnx_expand_boxes(boxes, scale) w_half = (boxes[:, 2] - boxes[:, 0]) * 0.5 h_half = (boxes[:, 3] - boxes[:, 1]) * 0.5 x_c = (boxes[:, 2] + boxes[:, 0]) * 0.5 y_c = (boxes[:, 3] + boxes[:, 1]) * 0.5 w_half *= scale h_half *= scale boxes_exp = torch.zeros_like(boxes) boxes_exp[:, 0] = x_c - w_half boxes_exp[:, 2] = x_c + w_half boxes_exp[:, 1] = y_c - h_half boxes_exp[:, 3] = y_c + h_half return boxes_exp @torch.jit.unused def expand_masks_tracing_scale(M, padding): # type: (int, int) -> float return torch.tensor(M + 2 * padding).to(torch.float32) / torch.tensor(M).to(torch.float32) def expand_masks(mask, padding): # type: (Tensor, int) -> Tuple[Tensor, float] M = mask.shape[-1] if torch._C._get_tracing_state(): # could not import is_tracing(), not sure why scale = expand_masks_tracing_scale(M, padding) else: scale = float(M + 2 * padding) / M padded_mask = F.pad(mask, (padding,) * 4) return padded_mask, scale def paste_mask_in_image(mask, box, im_h, im_w): # type: (Tensor, Tensor, int, int) -> Tensor TO_REMOVE = 1 w = int(box[2] - box[0] + TO_REMOVE) h = int(box[3] - box[1] + TO_REMOVE) w = max(w, 1) h = max(h, 1) # Set shape to [batchxCxHxW] mask = mask.expand((1, 1, -1, -1)) # Resize ins mask = F.interpolate(mask, size=(h, w), mode="bilinear", align_corners=False) mask = mask[0][0] im_mask = torch.zeros((im_h, im_w), dtype=mask.dtype, device=mask.device) x_0 = max(box[0], 0) x_1 = min(box[2] + 1, im_w) y_0 = max(box[1], 0) y_1 = min(box[3] + 1, im_h) im_mask[y_0:y_1, x_0:x_1] = mask[(y_0 - box[1]): (y_1 - box[1]), (x_0 - box[0]): (x_1 - box[0])] return im_mask def _onnx_paste_mask_in_image(mask, box, im_h, im_w): one = torch.ones(1, dtype=torch.int64) zero = torch.zeros(1, dtype=torch.int64) w = box[2] - box[0] + one h = box[3] - box[1] + one w = torch.max(torch.cat((w, one))) h = torch.max(torch.cat((h, one))) # Set shape to [batchxCxHxW] mask = mask.expand((1, 1, mask.size(0), mask.size(1))) # Resize ins mask = F.interpolate(mask, size=(int(h), int(w)), mode="bilinear", align_corners=False) mask = mask[0][0] x_0 = torch.max(torch.cat((box[0].unsqueeze(0), zero))) x_1 = torch.min(torch.cat((box[2].unsqueeze(0) + one, im_w.unsqueeze(0)))) y_0 = torch.max(torch.cat((box[1].unsqueeze(0), zero))) y_1 = torch.min(torch.cat((box[3].unsqueeze(0) + one, im_h.unsqueeze(0)))) unpaded_im_mask = mask[(y_0 - box[1]): (y_1 - box[1]), (x_0 - box[0]): (x_1 - box[0])] # TODO : replace below with a dynamic padding when support is added in ONNX # pad y zeros_y0 = torch.zeros(y_0, unpaded_im_mask.size(1)) zeros_y1 = torch.zeros(im_h - y_1, unpaded_im_mask.size(1)) concat_0 = torch.cat((zeros_y0, unpaded_im_mask.to(dtype=torch.float32), zeros_y1), 0)[0:im_h, :] # pad x zeros_x0 = torch.zeros(concat_0.size(0), x_0) zeros_x1 = torch.zeros(concat_0.size(0), im_w - x_1) im_mask = torch.cat((zeros_x0, concat_0, zeros_x1), 1)[:, :im_w] return im_mask @torch.jit._script_if_tracing def _onnx_paste_masks_in_image_loop(masks, boxes, im_h, im_w): res_append = torch.zeros(0, im_h, im_w) for i in range(masks.size(0)): mask_res = _onnx_paste_mask_in_image(masks[i][0], boxes[i], im_h, im_w) mask_res = mask_res.unsqueeze(0) res_append = torch.cat((res_append, mask_res)) return res_append def paste_masks_in_image(masks, boxes, img_shape, padding=1): # type: (Tensor, Tensor, Tuple[int, int], int) -> Tensor masks, scale = expand_masks(masks, padding=padding) boxes = expand_boxes(boxes, scale).to(dtype=torch.int64) im_h, im_w = img_shape if torchvision._is_tracing(): return _onnx_paste_masks_in_image_loop( masks, boxes, torch.scalar_tensor(im_h, dtype=torch.int64), torch.scalar_tensor(im_w, dtype=torch.int64) )[:, None] res = [paste_mask_in_image(m[0], b, im_h, im_w) for m, b in zip(masks, boxes)] if len(res) > 0: ret = torch.stack(res, dim=0)[:, None] else: ret = masks.new_empty((0, 1, im_h, im_w)) return ret class RoIHeads(nn.Module): __annotations__ = { "box_coder": det_utils.BoxCoder, "proposal_matcher": det_utils.Matcher, "fg_bg_sampler": det_utils.BalancedPositiveNegativeSampler, } def __init__( self, box_roi_pool, box_head, box_predictor, # Faster R-CNN training fg_iou_thresh, bg_iou_thresh, batch_size_per_image, positive_fraction, bbox_reg_weights, # Faster R-CNN inference score_thresh, nms_thresh, detections_per_img, # Line line_roi_pool=None, line_head=None, line_predictor=None, # point parameters point_roi_pool=None, point_head=None, point_predictor=None, ins_head=None, ins_predictor=None, ins_roi_pool=None, # arc parameters arc_roi_pool=None, arc_head=None, arc_predictor=None, # Mask mask_roi_pool=None, mask_head=None, mask_predictor=None, keypoint_roi_pool=None, keypoint_head=None, keypoint_predictor=None, detect_point=True, detect_line=False, detect_arc=False, detect_ins=False, ): super().__init__() self.box_similarity = box_ops.box_iou # assign ground-truth boxes for each proposal self.proposal_matcher = det_utils.Matcher(fg_iou_thresh, bg_iou_thresh, allow_low_quality_matches=False) self.fg_bg_sampler = det_utils.BalancedPositiveNegativeSampler(batch_size_per_image, positive_fraction) if bbox_reg_weights is None: bbox_reg_weights = (10.0, 10.0, 5.0, 5.0) self.box_coder = det_utils.BoxCoder(bbox_reg_weights) self.box_roi_pool = box_roi_pool self.box_head = box_head self.box_predictor = box_predictor self.score_thresh = score_thresh self.nms_thresh = nms_thresh self.detections_per_img = detections_per_img self.line_roi_pool = line_roi_pool self.line_head = line_head self.line_predictor = line_predictor self.point_roi_pool = point_roi_pool self.point_head = point_head self.point_predictor = point_predictor self.arc_roi_pool = arc_roi_pool self.arc_head = arc_head self.arc_predictor = arc_predictor self.ins_roi_pool = ins_roi_pool self.ins_head = ins_head self.ins_predictor = ins_predictor self.mask_roi_pool = mask_roi_pool self.mask_head = mask_head self.mask_predictor = mask_predictor self.keypoint_roi_pool = keypoint_roi_pool self.keypoint_head = keypoint_head self.keypoint_predictor = keypoint_predictor self.detect_point =detect_point self.detect_line =detect_line self.detect_arc =detect_arc self.detect_ins=detect_ins self.channel_compress = nn.Sequential( nn.Conv2d(256, 8, kernel_size=1), nn.BatchNorm2d(8), nn.ReLU(inplace=True) ) def has_mask(self): if self.mask_roi_pool is None: return False if self.mask_head is None: return False if self.mask_predictor is None: return False return True def has_keypoint(self): if self.keypoint_roi_pool is None: return False if self.keypoint_head is None: return False if self.keypoint_predictor is None: return False return True def has_line(self): # if self.line_roi_pool is None: # return False if self.line_head is None: return False # if self.line_predictor is None: # return False return True def has_point(self): # if self.line_roi_pool is None: # return False if self.point_head is None: return False # if self.line_predictor is None: # return False return True def has_arc(self): # if self.line_roi_pool is None: # return False if self.arc_head is None: return False # if self.line_predictor is None: # return False return True def has_ins(self): # if self.line_roi_pool is None: # return False if self.ins_head is None: return False # if self.line_predictor is None: # return False return True def assign_targets_to_proposals(self, proposals, gt_boxes, gt_labels): # type: (List[Tensor], List[Tensor], List[Tensor]) -> Tuple[List[Tensor], List[Tensor]] matched_idxs = [] labels = [] for proposals_in_image, gt_boxes_in_image, gt_labels_in_image in zip(proposals, gt_boxes, gt_labels): if gt_boxes_in_image.numel() == 0: # Background image device = proposals_in_image.device clamped_matched_idxs_in_image = torch.zeros( (proposals_in_image.shape[0],), dtype=torch.int64, device=device ) labels_in_image = torch.zeros((proposals_in_image.shape[0],), dtype=torch.int64, device=device) else: # set to self.box_similarity when https://github.com/pytorch/pytorch/issues/27495 lands match_quality_matrix = box_ops.box_iou(gt_boxes_in_image, proposals_in_image) matched_idxs_in_image = self.proposal_matcher(match_quality_matrix) clamped_matched_idxs_in_image = matched_idxs_in_image.clamp(min=0) labels_in_image = gt_labels_in_image[clamped_matched_idxs_in_image] labels_in_image = labels_in_image.to(dtype=torch.int64) # Label background (below the low threshold) bg_inds = matched_idxs_in_image == self.proposal_matcher.BELOW_LOW_THRESHOLD labels_in_image[bg_inds] = 0 # Label ignore proposals (between low and high thresholds) ignore_inds = matched_idxs_in_image == self.proposal_matcher.BETWEEN_THRESHOLDS labels_in_image[ignore_inds] = -1 # -1 is ignored by sampler matched_idxs.append(clamped_matched_idxs_in_image) labels.append(labels_in_image) return matched_idxs, labels def subsample(self, labels): # type: (List[Tensor]) -> List[Tensor] sampled_pos_inds, sampled_neg_inds = self.fg_bg_sampler(labels) sampled_inds = [] for img_idx, (pos_inds_img, neg_inds_img) in enumerate(zip(sampled_pos_inds, sampled_neg_inds)): img_sampled_inds = torch.where(pos_inds_img | neg_inds_img)[0] sampled_inds.append(img_sampled_inds) return sampled_inds def add_gt_proposals(self, proposals, gt_boxes): # type: (List[Tensor], List[Tensor]) -> List[Tensor] proposals = [torch.cat((proposal, gt_box)) for proposal, gt_box in zip(proposals, gt_boxes)] return proposals def check_targets(self, targets): # type: (Optional[List[Dict[str, Tensor]]]) -> None if targets is None: raise ValueError("targets should not be None") if not all(["boxes" in t for t in targets]): raise ValueError("Every element of targets should have a boxes key") if not all(["labels" in t for t in targets]): raise ValueError("Every element of targets should have a labels key") if self.has_mask(): if not all(["masks" in t for t in targets]): raise ValueError("Every element of targets should have a masks key") def select_training_samples( self, proposals, # type: List[Tensor] targets, # type: Optional[List[Dict[str, Tensor]]] ): # type: (...) -> Tuple[List[Tensor], List[Tensor], List[Tensor], List[Tensor]] self.check_targets(targets) if targets is None: raise ValueError("targets should not be None") dtype = proposals[0].dtype device = proposals[0].device gt_boxes = [t["boxes"].to(dtype) for t in targets] gt_labels = [t["labels"] for t in targets] # append ground-truth bboxes to propos proposals = self.add_gt_proposals(proposals, gt_boxes) # get matching gt indices for each proposal matched_idxs, labels = self.assign_targets_to_proposals(proposals, gt_boxes, gt_labels) # sample a fixed proportion of positive-negative proposals sampled_inds = self.subsample(labels) matched_gt_boxes = [] num_images = len(proposals) for img_id in range(num_images): img_sampled_inds = sampled_inds[img_id] proposals[img_id] = proposals[img_id][img_sampled_inds] labels[img_id] = labels[img_id][img_sampled_inds] matched_idxs[img_id] = matched_idxs[img_id][img_sampled_inds] gt_boxes_in_image = gt_boxes[img_id] if gt_boxes_in_image.numel() == 0: gt_boxes_in_image = torch.zeros((1, 4), dtype=dtype, device=device) matched_gt_boxes.append(gt_boxes_in_image[matched_idxs[img_id]]) regression_targets = self.box_coder.encode(matched_gt_boxes, proposals) return proposals, matched_idxs, labels, regression_targets def postprocess_detections( self, class_logits, # type: Tensor box_regression, # type: Tensor proposals, # type: List[Tensor] image_shapes, # type: List[Tuple[int, int]] ): # type: (...) -> Tuple[List[Tensor], List[Tensor], List[Tensor]] device = class_logits.device num_classes = class_logits.shape[-1] boxes_per_image = [boxes_in_image.shape[0] for boxes_in_image in proposals] pred_boxes = self.box_coder.decode(box_regression, proposals) pred_scores = F.softmax(class_logits, -1) pred_boxes_list = pred_boxes.split(boxes_per_image, 0) pred_scores_list = pred_scores.split(boxes_per_image, 0) all_boxes = [] all_scores = [] all_labels = [] for boxes, scores, image_shape in zip(pred_boxes_list, pred_scores_list, image_shapes): boxes = box_ops.clip_boxes_to_image(boxes, image_shape) # create labels for each prediction labels = torch.arange(num_classes, device=device) labels = labels.view(1, -1).expand_as(scores) # remove predictions with the background label boxes = boxes[:, 1:] scores = scores[:, 1:] labels = labels[:, 1:] # batch everything, by making every class prediction be a separate instance boxes = boxes.reshape(-1, 4) scores = scores.reshape(-1) labels = labels.reshape(-1) # remove low scoring boxes inds = torch.where(scores > self.score_thresh)[0] boxes, scores, labels = boxes[inds], scores[inds], labels[inds] # remove empty boxes keep = box_ops.remove_small_boxes(boxes, min_size=1e-2) boxes, scores, labels = boxes[keep], scores[keep], labels[keep] # non-maximum suppression, independently done per class keep = box_ops.batched_nms(boxes, scores, labels, self.nms_thresh) # keep only topk scoring predictions keep = keep[: self.detections_per_img] boxes, scores, labels = boxes[keep], scores[keep], labels[keep] all_boxes.append(boxes) all_scores.append(scores) all_labels.append(labels) return all_boxes, all_scores, all_labels def forward( self, features, # type: Dict[str, Tensor] proposals, # type: List[Tensor] image_shapes, # type: List[Tuple[int, int]] targets=None, # type: Optional[List[Dict[str, Tensor]]] ): # type: (...) -> Tuple[List[Dict[str, Tensor]], Dict[str, Tensor]] """ Args: features (List[Tensor]) proposals (List[Tensor[N, 4]]) image_shapes (List[Tuple[H, W]]) targets (List[Dict]) """ print(f'roihead forward!!!') if targets is not None: for t in targets: # TODO: https://github.com/pytorch/pytorch/issues/26731 floating_point_types = (torch.float, torch.double, torch.half) if not t["boxes"].dtype in floating_point_types: raise TypeError(f"target boxes must of float type, instead got {t['boxes'].dtype}") if not t["labels"].dtype == torch.int64: raise TypeError(f"target labels must of int64 type, instead got {t['labels'].dtype}") if self.has_keypoint(): if not t["keypoints"].dtype == torch.float32: raise TypeError(f"target keypoints must of float type, instead got {t['keypoints'].dtype}") if self.training: proposals, matched_idxs, labels, regression_targets = self.select_training_samples(proposals, targets) else: if targets is not None: proposals, matched_idxs, labels, regression_targets = self.select_training_samples(proposals, targets) else: labels = None regression_targets = None matched_idxs = None device=features['0'].device box_features = self.box_roi_pool(features, proposals, image_shapes) box_features = self.box_head(box_features) class_logits, box_regression = self.box_predictor(box_features) result: List[Dict[str, torch.Tensor]] = [] losses = {} # _, C, H, W = features['0'].shape # 忽略 batch_size,因为我们只关心 C, H, W if self.training: if labels is None: raise ValueError("labels cannot be None") if regression_targets is None: raise ValueError("regression_targets cannot be None") print(f'boxes compute losses') loss_classifier, loss_box_reg = fastrcnn_loss(class_logits, box_regression, labels, regression_targets) losses = {"loss_classifier": loss_classifier, "loss_box_reg": loss_box_reg} else: if targets is not None: loss_classifier, loss_box_reg = fastrcnn_loss(class_logits, box_regression, labels, regression_targets) losses = {"loss_classifier": loss_classifier, "loss_box_reg": loss_box_reg} boxes, scores, labels = self.postprocess_detections(class_logits, box_regression, proposals, image_shapes) num_images = len(boxes) for i in range(num_images): result.append( { "boxes": boxes[i], "labels": labels[i], "scores": scores[i], } ) if self.has_line() and self.detect_line: print(f'roi_heads forward has_line()!!!!') # print(f'labels:{labels}') line_proposals = [p["boxes"] for p in result] point_proposals = [p["boxes"] for p in result] print(f'boxes_proposals:{len(line_proposals)}') # if line_proposals is None or len(line_proposals) == 0: # # 返回空特征或者跳过该部分计算 # return torch.empty(0, C, H, W).to(features['0'].device) if self.training: # during training, only focus on positive boxes num_images = len(proposals) print(f'num_images:{num_images}') line_proposals = [] point_proposals = [] arc_proposals = [] pos_matched_idxs = [] line_pos_matched_idxs = [] point_pos_matched_idxs = [] if matched_idxs is None: raise ValueError("if in trainning, matched_idxs should not be None") for img_id in range(num_images): pos = torch.where(labels[img_id] > 0)[0] line_pos=torch.where(labels[img_id] ==2)[0] # point_pos=torch.where(labels[img_id] ==1)[0] line_proposals.append(proposals[img_id][line_pos]) # point_proposals.append(proposals[img_id][point_pos]) line_pos_matched_idxs.append(matched_idxs[img_id][line_pos]) # point_pos_matched_idxs.append(matched_idxs[img_id][point_pos]) # pos_matched_idxs.append(matched_idxs[img_id][pos]) else: if targets is not None: pos_matched_idxs = [] num_images = len(proposals) line_proposals = [] line_pos_matched_idxs = [] print(f'val num_images:{num_images}') if matched_idxs is None: raise ValueError("if in trainning, matched_idxs should not be None") for img_id in range(num_images): # pos = torch.where(labels[img_id] > 0)[0] line_pos = torch.where(labels[img_id] == 2)[0] line_proposals.append(proposals[img_id][line_pos]) line_pos_matched_idxs.append(matched_idxs[img_id][line_pos]) else: pos_matched_idxs = None line_proposals_valid=self.check_proposals(line_proposals) if line_proposals_valid: feature_logits = self.line_forward3(features, image_shapes, line_proposals) loss_line = None loss_line_iou =None if self.training: if targets is None or pos_matched_idxs is None: raise ValueError("both targets and pos_matched_idxs should not be None when in training mode") gt_lines = [t["lines"] for t in targets if "lines" in t] # print(f'gt_lines:{gt_lines[0].shape}') h, w = targets[0]["img_size"] img_size = h gt_lines_tensor=torch.zeros(0,0) if len(gt_lines)>0: gt_lines_tensor = torch.cat(gt_lines) print(f'gt_lines_tensor:{gt_lines_tensor.shape}') if gt_lines_tensor.shape[0]>0 : print(f'start to lines_point_pair_loss') loss_line = lines_point_pair_loss( feature_logits, line_proposals, gt_lines, line_pos_matched_idxs ) loss_line_iou = line_iou_loss(feature_logits, line_proposals, gt_lines, line_pos_matched_idxs, img_size) if loss_line is None: print(f'loss_line is None111') loss_line = torch.tensor(0.0, device=device) if loss_line_iou is None: print(f'loss_line_iou is None111') loss_line_iou = torch.tensor(0.0, device=device) loss_line = {"loss_line": loss_line} loss_line_iou = {'loss_line_iou': loss_line_iou} else: if targets is not None: h, w = targets[0]["img_size"] img_size = h gt_lines = [t["lines"] for t in targets if "lines" in t] gt_lines_tensor = torch.zeros(0, 0) if len(gt_lines)>0: gt_lines_tensor = torch.cat(gt_lines) if gt_lines_tensor.shape[0] > 0 and feature_logits is not None: loss_line = lines_point_pair_loss( feature_logits, line_proposals, gt_lines, line_pos_matched_idxs ) print(f'compute_line_loss:{loss_line}') loss_line_iou = line_iou_loss(feature_logits , line_proposals, gt_lines, line_pos_matched_idxs, img_size) if loss_line is None: print(f'loss_line is None') loss_line=torch.tensor(0.0,device=device) if loss_line_iou is None: print(f'loss_line_iou is None') loss_line_iou=torch.tensor(0.0,device=device) loss_line = {"loss_line": loss_line} loss_line_iou = {'loss_line_iou': loss_line_iou} else: loss_line = {} loss_line_iou = {} if feature_logits is None or line_proposals is None: raise ValueError( "both keypoint_logits and keypoint_proposals should not be None when not in training mode" ) if feature_logits is not None: lines_probs, lines_scores = line_inference(feature_logits,line_proposals) for masks, kps, r in zip(lines_probs, lines_scores, result): r["lines"] = masks r["lines_scores"] = kps print(f'loss_line11111:{loss_line}') losses.update(loss_line) losses.update(loss_line_iou) print(f'losses:{losses}') if self.has_point() and self.detect_point: print(f'roi_heads forward has_point()!!!!') # print(f'labels:{labels}') point_proposals = [p["boxes"] for p in result] print(f'boxes_proposals:{len(point_proposals)}') # if line_proposals is None or len(line_proposals) == 0: # # 返回空特征或者跳过该部分计算 # return torch.empty(0, C, H, W).to(features['0'].device) if self.training: # during training, only focus on positive boxes num_images = len(proposals) print(f'num_images:{num_images}') point_proposals = [] point_pos_matched_idxs = [] if matched_idxs is None: raise ValueError("if in trainning, matched_idxs should not be None") for img_id in range(num_images): point_pos=torch.where(labels[img_id] ==1)[0] point_proposals.append(proposals[img_id][point_pos]) point_pos_matched_idxs.append(matched_idxs[img_id][point_pos]) else: if targets is not None: num_images = len(proposals) point_proposals = [] point_pos_matched_idxs = [] print(f'val num_images:{num_images}') if matched_idxs is None: raise ValueError("if in trainning, matched_idxs should not be None") for img_id in range(num_images): point_pos = torch.where(labels[img_id] == 1)[0] point_proposals.append(proposals[img_id][point_pos]) point_pos_matched_idxs.append(matched_idxs[img_id][point_pos]) else: pos_matched_idxs = None point_proposals_valid = self.check_proposals(point_proposals) if point_proposals_valid: feature_logits = self.point_forward1(features, image_shapes, point_proposals) loss_point=None if self.training: if targets is None or point_pos_matched_idxs is None: raise ValueError("both targets and pos_matched_idxs should not be None when in training mode") gt_points = [t["points"] for t in targets if "points" in t] print(f'gt_points:{gt_points[0].shape}') h, w = targets[0]["img_size"] img_size = h gt_points_tensor = torch.zeros(0, 0) if len(gt_points) > 0: gt_points_tensor = torch.cat(gt_points) print(f'gt_points_tensor:{gt_points_tensor.shape}') if gt_points_tensor.shape[0] > 0: print(f'start to compute point_loss') loss_point=compute_point_loss(feature_logits,point_proposals,gt_points,point_pos_matched_idxs) if loss_point is None: print(f'loss_point is None111') loss_point = torch.tensor(0.0, device=device) loss_point = {"loss_point": loss_point} else: if targets is not None: h, w = targets[0]["img_size"] img_size = h gt_points = [t["points"] for t in targets if "points" in t] gt_points_tensor = torch.zeros(0, 0) if len(gt_points) > 0: gt_points_tensor = torch.cat(gt_points) print(f'gt_points_tensor:{gt_points_tensor.shape}') if gt_points_tensor.shape[0] > 0: print(f'start to compute point_loss') loss_point = compute_point_loss(feature_logits, point_proposals, gt_points, point_pos_matched_idxs) if loss_point is None: print(f'loss_point is None111') loss_point = torch.tensor(0.0, device=device) loss_point = {"loss_point": loss_point} else: loss_point = {} if feature_logits is None or point_proposals is None: raise ValueError( "both keypoint_logits and keypoint_proposals should not be None when not in training mode" ) if feature_logits is not None: points_probs, points_scores = point_inference(feature_logits,point_proposals) for masks, kps, r in zip(points_probs, points_scores, result): r["points"] = masks r["points_scores"] = kps print(f'loss_point:{loss_point}') losses.update(loss_point) print(f'losses:{losses}') if self.has_arc() and self.detect_arc: print(f'roi_heads forward has_arc()!!!!') # print(f'labels:{labels}') arc_proposals = [p["boxes"] for p in result] print(f'boxes_proposals:{len(arc_proposals)}') print(f'boxes_proposals:{len(arc_proposals)}') # if line_proposals is None or len(line_proposals) == 0: # # 返回空特征或者跳过该部分计算 # return torch.empty(0, C, H, W).to(features['0'].device) if self.training: # during training, only focus on positive boxes num_images = len(proposals) print(f'num_images:{num_images}') arc_proposals = [] arc_pos_matched_idxs = [] if matched_idxs is None: raise ValueError("if in trainning, matched_idxs should not be None") for img_id in range(num_images): arc_pos=torch.where(labels[img_id] ==3)[0] arc_proposals.append(proposals[img_id][arc_pos]) arc_pos_matched_idxs.append(matched_idxs[img_id][arc_pos]) else: if targets is not None: num_images = len(proposals) arc_proposals = [] arc_pos_matched_idxs = [] print(f'val num_images:{num_images}') if matched_idxs is None: raise ValueError("if in trainning, matched_idxs should not be None") for img_id in range(num_images): arc_pos = torch.where(labels[img_id] == 3)[0] arc_proposals.append(proposals[img_id][arc_pos]) arc_pos_matched_idxs.append(matched_idxs[img_id][arc_pos]) else: arc_pos_matched_idxs = None arc_proposals_valid=self.check_proposals(arc_proposals) if arc_proposals_valid: feature_logits = self.arc_forward1(features, image_shapes, arc_proposals) loss_arc=None if self.training: if targets is None or arc_pos_matched_idxs is None: raise ValueError("both targets and pos_matched_idxs should not be None when in training mode") gt_arcs = [t["arc_mask"] for t in targets if "arc_mask" in t] print(f'gt_arcs:{gt_arcs[0].shape}') h, w = targets[0]["img_size"] img_size = h if len(gt_arcs) > 0 and feature_logits is not None: loss_arc = compute_ins_loss(feature_logits, arc_proposals, gt_arcs, arc_pos_matched_idxs) if loss_arc is None: print(f'loss_arc is None111') loss_arc = torch.tensor(0.0, device=device) loss_arc = {"loss_arc": loss_arc} else: if targets is not None: h, w = targets[0]["img_size"] img_size = h gt_arcs = [t["arc_mask"] for t in targets if "arc_mask" in t] print(f'gt_arcs:{gt_arcs[0].shape}') h, w = targets[0]["img_size"] img_size = h if len(gt_arcs) > 0 and feature_logits is not None: print(f'start to compute arc_loss') loss_arc = compute_ins_loss(feature_logits, arc_proposals, gt_arcs, arc_pos_matched_idxs) if loss_arc is None: print(f'loss_arc is None111') loss_arc = torch.tensor(0.0, device=device) loss_arc = {"loss_arc": loss_arc} else: loss_arc = {} if feature_logits is None or arc_proposals is None: # raise ValueError( # "both arc_feature_logits and arc_proposals should not be None when not in training mode" # ) print(f'error :both arc_feature_logits and arc_proposals should not be None when not in training mode"') pass if feature_logits is not None and arc_proposals is not None: arcs_probs, arcs_scores, arcs_point = ins_inference(feature_logits, arc_proposals, th=0) for masks, kps, kp, r in zip(arcs_probs, arcs_scores, arcs_point, result): # r["arcs"] = keypoint_prob r["arcs"] = feature_logits r["arcs_scores"] = kps r["arcs_point"] = feature_logits # print(f'loss_point:{loss_point}') losses.update(loss_arc) print(f'losses:{losses}') if self.has_ins and self.detect_ins: print(f'roi_heads forward has_circle()!!!!') print(f'labels:{labels}') # for p in result: # print(f'p["boxes"]:{p["boxes"]}') # print(f'p["labels"]:{p["labels"]}') ins_proposals = [p["boxes"][p["labels"] == 4] for p in result] # print(f'ins_proposals11:{ins_proposals}') print(f'boxes_proposals:{len(ins_proposals)}') if self.training: # during training, only focus on positive boxes num_images = len(proposals) print(f'num_images:{num_images}') ins_proposals = [] ins_pos_matched_idxs = [] if matched_idxs is None: raise ValueError("if in trainning, matched_idxs should not be None") for img_id in range(num_images): circle_pos = torch.where(labels[img_id] == 4)[0] circle_pos = circle_pos.flatten() idxs = circle_pos.detach().cpu().tolist() num_prop = len(proposals[img_id]) for idx in idxs: if idx < 0 or idx >= num_prop: raise RuntimeError( f"Index out of bounds: circle_pos={idx}, but proposals len={num_prop}, " f"img_id={img_id}" ) ins_proposals.append(proposals[img_id][idxs]) ins_pos_matched_idxs.append(matched_idxs[img_id][idxs]) print(f'ins_proposals in train:{ins_proposals}') else: if targets is not None: num_images = len(proposals) ins_proposals = [] ins_pos_matched_idxs = [] print(f'val num_images:{num_images}') if matched_idxs is None: raise ValueError("if in trainning, matched_idxs should not be None") for img_id in range(num_images): circle_pos = torch.where(labels[img_id] == 4)[0] circle_pos = circle_pos.flatten() idxs = circle_pos.detach().cpu().tolist() num_prop = len(proposals[img_id]) for idx in idxs: if idx < 0 or idx >= num_prop: raise RuntimeError( f"Index out of bounds: circle_pos={idx}, but proposals len={num_prop}, " f"img_id={img_id}" ) ins_proposals.append(proposals[img_id][idxs]) ins_pos_matched_idxs.append(matched_idxs[img_id][idxs]) print(f'ins_proposals in val:{ins_proposals}') else: pos_matched_idxs = None # circle_proposals_tensor=torch.cat(circle_proposals) ins_proposals_valid = self.check_proposals(ins_proposals) print(f"self.train:{self.training}") print(f"proposals_valid:{ins_proposals_valid}") if ins_proposals_valid: print(f'features from backbone:{features['0'].shape}') print(f'ins_proposals in ins_forward1:{ins_proposals}') feature_logits = self.ins_forward1(features, image_shapes, ins_proposals) # ins_masks, ins_scores, circle_points = ins_inference(feature_logits, # ins_proposals, th=0) # plt.title('feature_logits') # plt.imshow(feature_logits[0][0].cpu().detach().numpy()) # plt.show() arc_equation = self.arc_equation_head(feature_logits) # [proposal和,9] loss_ins = None loss_ins_extra=None loss_arc_equation = None loss_arc_ends = None if self.training: print("circle loss!!!!!!") if targets is None or ins_pos_matched_idxs is None: raise ValueError("both targets and pos_matched_idxs should not be None when in training mode") gt_inses = [t["circle_masks"] for t in targets if "circle_masks" in t] gt_labels = [t["labels"] for t in targets] gt_arcs = [t["arc_mask"] for t in targets if "arc_mask" in t] gt_mask_ends = [t["mask_ends"] for t in targets if "mask_ends" in t] gt_mask_params = [t["mask_params"] for t in targets if "mask_params" in t] # h, w = targets[0]["img_size"] # img_size = h gt_ins_tensor = torch.zeros(0, 0) if len(gt_inses) > 0: print_params(gt_inses) gt_ins_tensor = torch.cat(gt_inses) print(f'gt_ins_tensor:{gt_ins_tensor.shape}') if gt_ins_tensor.shape[0] > 0: print(f'start to compute circle_loss') loss_ins = compute_ins_loss(feature_logits, ins_proposals, gt_inses,ins_pos_matched_idxs) # total_loss, loss_arc_equation, loss_arc_ends = compute_arc_equation_loss(arc_equation,ins_proposals,gt_mask_ends,gt_mask_params,ins_pos_matched_idxs,labels) # loss_arc_ends = loss_arc_ends # if loss_arc_equation is None: # print(f'loss_arc_equation is None') # loss_arc_equation = torch.tensor(0.0, device=device) # # if loss_arc_ends is None: # print(f'loss_arc_ends is None') # loss_arc_ends = torch.tensor(0.0, device=device) if loss_ins is None: print(f'loss_ins is None111') loss_ins = torch.tensor(0.0, device=device) # if loss_ins_extra is None: # print(f'loss_ins_extra is None111') # loss_ins_extra = torch.tensor(0.0, device=device) loss_ins = {"loss_ins": loss_ins} # loss_ins_extra = {"loss_ins_extra": loss_ins_extra} # loss_arc_equation = {"loss_arc_equation": loss_arc_equation} # loss_arc_ends = {"loss_arc_ends": loss_arc_ends} else: if targets is not None: # h, w = targets[0]["img_size"] # img_size = h gt_inses = [t["circle_masks"] for t in targets if "circle_masks" in t] gt_labels = [t["labels"] for t in targets] gt_mask_ends = [t["mask_ends"] for t in targets if "mask_ends" in t] gt_mask_params = [t["mask_params"] for t in targets if "mask_params" in t] gt_ins_tensor = torch.zeros(0, 0) if len(gt_inses) > 0: gt_ins_tensor = torch.cat(gt_inses) print(f'gt_ins_tensor:{gt_ins_tensor.shape}') if gt_ins_tensor.shape[0] > 0: print(f'start to compute circle_loss') loss_ins = compute_ins_loss(feature_logits, ins_proposals, gt_inses, ins_pos_matched_idxs) # total_loss, loss_arc_equation, loss_arc_ends = compute_arc_equation_loss(arc_equation,ins_proposals,gt_mask_ends,gt_mask_params,ins_pos_matched_idxs,labels) # # loss_arc_ends = loss_arc_ends # loss_ins_extra = compute_circle_extra_losses(feature_logits, circle_proposals, gt_circles,circle_pos_matched_idxs) if loss_ins is None: print(f'loss_ins is None111') loss_ins = torch.tensor(0.0, device=device) # if loss_ins_extra is None: # print(f'loss_ins_extra is None111') # loss_ins_extra = torch.tensor(0.0, device=device) # if loss_arc_equation is None: # print(f'loss_arc_equation is None') # loss_arc_equation = torch.tensor(0.0, device=device) # # if loss_arc_ends is None: # print(f'loss_arc_ends is None') # loss_arc_ends = torch.tensor(0.0, device=device) loss_ins = {"loss_ins": loss_ins} # loss_ins_extra = {"loss_ins_extra": loss_ins_extra} # loss_arc_equation = {"loss_arc_equation": loss_arc_equation} # loss_arc_ends = {"loss_arc_ends": loss_arc_ends} else: loss_ins = {} # loss_ins_extra = {} # loss_arc_equation = {} # loss_arc_ends = {} if feature_logits is None or ins_proposals is None: raise ValueError( "both keypoint_logits and keypoint_proposals should not be None when not in training mode" ) if feature_logits is not None: ins_masks, ins_scores, circle_points = ins_inference(feature_logits, ins_proposals, th=0) arc7, arc_scores = arc_inference1(arc_equation, feature_logits, ins_proposals, 0.5) for arc_, arc_score, r in zip(arc7, arc_scores, result): r["arcs"] = arc_ r["arc_scores"] = arc_score # print(f'circles_probs:{circles_probs.shape}, circles_scores:{circles_scores.shape}') proposals_per_image = [box.size(0) for box in ins_proposals] print(f'ins_proposals_per_image:{proposals_per_image}') feature_logits_props = [] start_idx = 0 for num_p in proposals_per_image: current_features = feature_logits[start_idx:start_idx + num_p] merged_feature = torch.sum(current_features, dim=0, keepdim=True) feature_logits_props.append(merged_feature) start_idx += num_p for masks, kps, r, f in zip(ins_masks, ins_scores, result, feature_logits_props): r["ins_masks"] = masks r["ins_scores"] = kps print(f'ins feature map:{f.shape}') r["features"] = f.squeeze(0) print(f'loss_ins:{loss_ins}') # print(f'loss_ins_extra:{loss_ins_extra}') losses.update(loss_ins) # losses.update(loss_ins_extra) # losses.update(loss_arc_equation) # losses.update(loss_arc_ends) print(f'losses:{losses}') if self.has_mask(): mask_proposals = [p["boxes"] for p in result] if self.training: if matched_idxs is None: raise ValueError("if in training, matched_idxs should not be None") # during training, only focus on positive boxes num_images = len(proposals) mask_proposals = [] pos_matched_idxs = [] for img_id in range(num_images): pos = torch.where(labels[img_id] > 0)[0] mask_proposals.append(proposals[img_id][pos]) pos_matched_idxs.append(matched_idxs[img_id][pos]) else: pos_matched_idxs = None if self.mask_roi_pool is not None: mask_features = self.mask_roi_pool(features, mask_proposals, image_shapes) mask_features = self.mask_head(mask_features) mask_logits = self.mask_predictor(mask_features) else: raise Exception("Expected mask_roi_pool to be not None") loss_mask = {} if self.training: if targets is None or pos_matched_idxs is None or mask_logits is None: raise ValueError("targets, pos_matched_idxs, mask_logits cannot be None when training") gt_masks = [t["masks"] for t in targets] gt_labels = [t["labels"] for t in targets] rcnn_loss_mask = maskrcnn_loss(mask_logits, mask_proposals, gt_masks, gt_labels, pos_matched_idxs) loss_mask = {"loss_mask": rcnn_loss_mask} else: labels = [r["labels"] for r in result] masks_probs = maskrcnn_inference(mask_logits, labels) for mask_prob, r in zip(masks_probs, result): r["masks"] = mask_prob losses.update(loss_mask) # keep none checks in if conditional so torchscript will conditionally # compile each branch if self.has_keypoint(): keypoint_proposals = [p["boxes"] for p in result] if self.training: # during training, only focus on positive boxes num_images = len(proposals) keypoint_proposals = [] pos_matched_idxs = [] if matched_idxs is None: raise ValueError("if in trainning, matched_idxs should not be None") for img_id in range(num_images): pos = torch.where(labels[img_id] > 0)[0] keypoint_proposals.append(proposals[img_id][pos]) pos_matched_idxs.append(matched_idxs[img_id][pos]) else: pos_matched_idxs = None keypoint_features = self.line_roi_pool(features, keypoint_proposals, image_shapes) keypoint_features = self.line_head(keypoint_features) keypoint_logits = self.line_predictor(keypoint_features) loss_keypoint = {} if self.training: if targets is None or pos_matched_idxs is None: raise ValueError("both targets and pos_matched_idxs should not be None when in training mode") gt_keypoints = [t["keypoints"] for t in targets] rcnn_loss_keypoint = keypointrcnn_loss( keypoint_logits, keypoint_proposals, gt_keypoints, pos_matched_idxs ) loss_keypoint = {"loss_keypoint": rcnn_loss_keypoint} else: if keypoint_logits is None or keypoint_proposals is None: raise ValueError( "both keypoint_logits and keypoint_proposals should not be None when not in training mode" ) keypoints_probs, lines_scores = keypointrcnn_inference(keypoint_logits, keypoint_proposals) for masks, kps, r in zip(keypoints_probs, lines_scores, result): r["keypoints"] = masks r["keypoints_scores"] = kps losses.update(loss_keypoint) return result, losses def check_proposals(self, proposals): valid = True for proposal in proposals: # print(f'per circle_proposal:{circle_proposal.shape}') if proposal.shape[0] == 0: valid = False return valid def line_forward1(self, features, image_shapes, line_proposals): print(f'line_proposals:{len(line_proposals)}') # cs_features= features['0'] # print(f'features-0:{features['0'].shape}') cs_features = self.channel_compress(features['0']) filtered_proposals = [proposal for proposal in line_proposals if proposal.shape[0] > 0] if len(filtered_proposals) > 0: filtered_proposals_tensor = torch.cat(filtered_proposals) print(f'filtered_proposals_tensor:{filtered_proposals_tensor.shape}') line_proposals_tensor = torch.cat(line_proposals) print(f'line_proposals_tensor:{line_proposals_tensor.shape}') roi_features = features_align(cs_features, line_proposals, image_shapes) if roi_features is not None: print(f'line_features from align:{roi_features.shape}') feature_logits = self.line_head(roi_features) print(f'feature_logits from line_head:{feature_logits.shape}') return feature_logits def line_forward2(self, features, image_shapes, line_proposals): print(f'line_proposals:{len(line_proposals)}') # cs_features= features['0'] # print(f'features-0:{features['0'].shape}') # cs_features = self.channel_compress(features['0']) cs_features=features['0'] filtered_proposals = [proposal for proposal in line_proposals if proposal.shape[0] > 0] if len(filtered_proposals) > 0: filtered_proposals_tensor = torch.cat(filtered_proposals) print(f'filtered_proposals_tensor:{filtered_proposals_tensor.shape}') line_proposals=filtered_proposals line_proposals_tensor = torch.cat(line_proposals) print(f'line_proposals_tensor:{line_proposals_tensor.shape}') feature_logits = self.line_head(cs_features) print(f'feature_logits from line_head:{feature_logits.shape}') roi_features = features_align(feature_logits, line_proposals, image_shapes) if roi_features is not None: print(f'roi_features from align:{roi_features.shape}') return roi_features def line_forward3(self, features, image_shapes, line_proposals): print(f'line_proposals:{len(line_proposals)}') # cs_features= features['0'] # print(f'features-0:{features['0'].shape}') # cs_features = self.channel_compress(features['0']) cs_features=features['0'] # cs_features = features # filtered_proposals = [proposal for proposal in line_proposals if proposal.shape[0] > 0] # # if len(filtered_proposals) > 0: # filtered_proposals_tensor = torch.cat(filtered_proposals) # print(f'filtered_proposals_tensor:{filtered_proposals_tensor.shape}') # line_proposals=filtered_proposals # line_proposals_tensor = torch.cat(line_proposals) # print(f'line_proposals_tensor:{line_proposals_tensor.shape}') feature_logits = self.line_predictor(cs_features) print(f'feature_logits from line_head:{feature_logits.shape}') roi_features = features_align(feature_logits, line_proposals, image_shapes) if roi_features is not None: print(f'roi_features from align:{roi_features.shape}') return roi_features def point_forward1(self, features, image_shapes, proposals): print(f'point_proposals:{len(proposals)}') # cs_features= features['0'] # print(f'features-0:{features['0'].shape}') # cs_features = self.channel_compress(features['0']) cs_features=features['0'] # filtered_proposals = [proposal for proposal in proposals if proposal.shape[0] > 0] # # if len(filtered_proposals) > 0: # filtered_proposals_tensor = torch.cat(filtered_proposals) # print(f'filtered_proposals_tensor:{filtered_proposals_tensor.shape}') # proposals=filtered_proposals # point_proposals_tensor = torch.cat(proposals) # print(f'point_proposals_tensor:{point_proposals_tensor.shape}') feature_logits = self.point_predictor(cs_features) print(f'feature_logits from line_head:{feature_logits.shape}') roi_features = features_align(feature_logits, proposals, image_shapes) if roi_features is not None: print(f'roi_features from align:{roi_features.shape}') return roi_features def ins_forward1(self, features, image_shapes, proposals): print(f'circle_proposals:{len(proposals)}') # cs_features= features['0'] # print(f'features-0:{features['0'].shape}') # cs_features = self.channel_compress(features['0']) # cs_features=features['0'] cs_features = features # filtered_proposals = [proposal for proposal in proposals if proposal.shape[0] > 0] # # if len(filtered_proposals) > 0: # filtered_proposals_tensor = torch.cat(filtered_proposals) # print(f'filtered_proposals_tensor:{filtered_proposals_tensor.shape}') # proposals=filtered_proposals # point_proposals_tensor = torch.cat(proposals) # print(f'point_proposals_tensor:{point_proposals_tensor.shape}') feature_logits = self.ins_head(cs_features) print(f'feature_logits from circle_head:{feature_logits.shape}') roi_features = features_align(feature_logits, proposals, image_shapes) if roi_features is not None: print(f'roi_features from align:{roi_features.shape}') return roi_features def arc_forward1(self, features, image_shapes, proposals): print(f'arc_proposals:{len(proposals)}') # cs_features= features['0'] # print(f'features-0:{features['0'].shape}') # cs_features = self.channel_compress(features['0']) # cs_features=features['0'] cs_features = features # filtered_proposals = [proposal for proposal in proposals if proposal.shape[0] > 0] # # if len(filtered_proposals) > 0: # filtered_proposals_tensor = torch.cat(filtered_proposals) # print(f'filtered_proposals_tensor:{filtered_proposals_tensor.shape}') # proposals=filtered_proposals # point_proposals_tensor = torch.cat(proposals) # print(f'point_proposals_tensor:{point_proposals_tensor.shape}') feature_logits = self.arc_predictor(cs_features) print(f'feature_logits from arc_head:{feature_logits.shape}') roi_features = features_align(feature_logits, proposals, image_shapes) if roi_features is not None: print(f'roi_features from align:{roi_features.shape}') return roi_features import numpy as np import torch import torch.nn.functional as F def compute_arc_equation_loss(arc_equation, proposals, gt_mask_ends, gt_mask_params, arc_pos_matched_idxs, gt_labels_all): """ Compute loss between predicted arc equations and ground truth. Args: arc_equation: list of length B, each Tensor (N_i, 7) gt_mask_ends: GT arc end masks (for angle calculation) gt_mask_params: list of length B, each numpy array (num_gt, 5) arc_pos_matched_idxs: list of length B, each Tensor of indices matching predictions to GT gt_labels_all: list of length B, GT labels """ len_proposals = len(proposals) # batch device = arc_equation[0].device print( f'compute_arc_equation_loss line_logits.shape:{arc_equation.shape},len_proposals:{len_proposals},line_matched_idxs:{arc_pos_matched_idxs}') print(f'gt_mask_ends:{gt_mask_ends}, gt_mask_params:{gt_mask_params}') # gt_angles = [] # # for gt_mask_end,gt_mask_param in zip(gt_mask_ends, gt_mask_params): # # print(f'gt_mask_end:{gt_mask_end}, gt_mask_param:{gt_mask_param}') # # gt_angles.append(compute_arc_angles(gt_mask_end,gt_mask_param)) # for i in range(len(gt_mask_ends)): # print(f'gt_mask_end:{gt_mask_ends[i]}, gt_mask_param:{gt_mask_params[i]}') # gt_angles.append(compute_arc_angles(gt_mask_ends[i], gt_mask_params[i])) # print(f'gt_angles:{gt_angles}') print(f'gt_mask_params:{gt_mask_params}') print(f'gt_labels_all:{gt_labels_all}') print(f'arc_pos_matched_idxs:{arc_pos_matched_idxs}') gt_sel_params = [] gt_sel_angles = [] for proposals_per_image, gt_ends, gt_params, gt_label, midx in zip(proposals, gt_mask_ends, gt_mask_params, gt_labels_all, arc_pos_matched_idxs): print(f'line_proposals_per_image:{proposals_per_image.shape}') # gt_angle = torch.tensor(gt_angle) # gt_ends = torch.tensor(gt_ends) # gt_params = torch.tensor(gt_params) if isinstance(gt_ends, np.ndarray): gt_ends = torch.from_numpy(gt_ends).float() # numpy¡útensor£¨float32£© else: gt_ends = gt_ends.clone().detach().float() # tensor¡ú¸´ÖÆ+°þÀëÌݶÈ+תfloat32 if isinstance(gt_params, np.ndarray): gt_params = torch.from_numpy(gt_params).float() else: gt_params = gt_params.clone().detach().float() device = torch.device("cuda" if torch.cuda.is_available() else "cpu") gt_ends = gt_ends.to(device) gt_params = gt_params.to(device) if gt_ends.shape[0] > 0: # positions = (gt_label == 3).nonzero()[0].item() po = gt_ends[midx.cpu()] pa = gt_params[midx.cpu()] print(f'po:{po},pa:{pa}') gt_sel_angles.append(po) gt_sel_params.append(pa) gt_sel_angles = torch.cat(gt_sel_angles, dim=0) gt_sel_params = torch.cat(gt_sel_params, dim=0) pred_ends = arc_equation[:, 5:9] pred_params = arc_equation[:, :5] # print_params(pred_angles, pred_params, gt_sel_angles, gt_sel_params) # pred_sin = torch.sin(pred_angles) # pred_cos = torch.cos(pred_angles) # gt_sin = torch.sin(gt_sel_angles) # gt_cos = torch.cos(gt_sel_angles) # angle_loss = F.mse_loss(pred_sin, gt_sin) + F.mse_loss(pred_cos, gt_cos) param_loss = F.mse_loss(pred_params, gt_sel_params) / 10000 print("start") print_params(pred_ends, gt_sel_angles) pred_ends = pred_ends.view(-1, 2, 2) print("end") print_params(pred_ends, gt_sel_angles) ends_loss = F.mse_loss(pred_ends, gt_sel_angles) / 10000 # print(f'angle_loss:{angle_loss.item()}, param_loss:{param_loss.item()}') count = sum(len(sublist) for sublist in proposals) total_loss = ((param_loss + ends_loss) / count) if count > 0 else torch.tensor(0.0, device=device, dtype=torch.float) total_loss = total_loss.to(device) ends_loss = ends_loss.to(device) param_loss = param_loss.to(device) # print(f'total_loss, param_loss, angle_loss: {total_loss.item()}, {param_loss.item()}, {angle_loss.item()}') return total_loss, param_loss, ends_loss # angle_loss = F.mse_loss(pred_angles, gt_sel_angles) # param_loss = F.mse_loss(pred_params.cpu(), gt_sel_params) / 10000 # print(f'angle_loss:{angle_loss}, param_loss:{param_loss}') # # count = sum(len(sublist) for sublist in proposals) # # total_loss = (param_loss + angle_loss) / count if count > 0 else torch.tensor(0.0) # # # 确保 dtype 和 device # total_loss = total_loss.float().to(device) # angle_loss = angle_loss.float().to(device) # param_loss = param_loss.float().to(device) # # print(f'total_loss, param_loss, angle_loss:{total_loss, param_loss, angle_loss}') # # return total_loss, param_loss, angle_loss def compute_arc_angles(gt_mask_ends, gt_mask_params): """ 给定椭圆上的一个点,计算其对应的参数角 phi(弧度)。 Parameters: point: tuple or array-like, (x, y) ellipse_param: tuple or array-like, (xc, yc, a, b, theta) Returns: phi: float, in [0, 2*pi) """ results = [] gt_mask_params_tensor = torch.tensor(gt_mask_params, dtype=gt_mask_ends.dtype, device=gt_mask_ends.device) for ends_img, params_img in zip(gt_mask_ends, gt_mask_params_tensor): # print(f'params_img:{params_img}') if torch.norm(params_img) < 1e-6: # L2 norm near zero results.append(torch.zeros(2, device=params_img.device, dtype=params_img.dtype)) continue x, y = ends_img xc, yc, a, b, theta = params_img # 1. 平移到中心 dx = x - xc dy = y - yc # 2. 逆旋转(旋转 -theta) cos_t = torch.cos(theta) sin_t = torch.sin(theta) X = dx * cos_t + dy * sin_t Y = -dx * sin_t + dy * cos_t # 3. 归一化到单位圆(除以 a, b) cos_phi = X / a sin_phi = Y / b # 4. 用 atan2 求角度(自动处理象限) phi = torch.atan2(sin_phi, cos_phi) # 5. 转换到 [0, 2π) phi = torch.where(phi < 0, phi + 2 * torch.pi, phi) results.append(phi) return results