import os from typing import Any, Callable, List, Optional, Tuple, Union import torch from torch import nn from libs.vision_libs import ops from libs.vision_libs.models import MobileNet_V3_Large_Weights, mobilenet_v3_large, EfficientNet_V2_S_Weights, \ efficientnet_v2_s, detection, EfficientNet_V2_L_Weights, efficientnet_v2_l, EfficientNet_V2_M_Weights, \ efficientnet_v2_m from libs.vision_libs.models.detection.anchor_utils import AnchorGenerator from libs.vision_libs.models.detection.rpn import RPNHead, RegionProposalNetwork from libs.vision_libs.models.detection.ssdlite import _mobilenet_extractor from libs.vision_libs.models.detection.transform import GeneralizedRCNNTransform from libs.vision_libs.ops import misc as misc_nn_ops, MultiScaleRoIAlign from libs.vision_libs.transforms._presets import ObjectDetection from libs.vision_libs.models._api import register_model, Weights, WeightsEnum from libs.vision_libs.models._meta import _COCO_PERSON_CATEGORIES, _COCO_PERSON_KEYPOINT_NAMES, _COCO_CATEGORIES from libs.vision_libs.models._utils import _ovewrite_value_param, handle_legacy_interface from libs.vision_libs.models.resnet import resnet50, ResNet50_Weights, ResNet18_Weights, resnet18 from libs.vision_libs.models.detection._utils import overwrite_eps from libs.vision_libs.models.detection.backbone_utils import _resnet_fpn_extractor, _validate_trainable_layers, \ BackboneWithFPN from libs.vision_libs.models.detection.faster_rcnn import FasterRCNN, TwoMLPHead, FastRCNNPredictor from .roi_heads import RoIHeads from .trainer import Trainer from ..base import backbone_factory from ..base.backbone_factory import get_convnext_fpn, get_anchor_generator # from ..base.backbone_factory import get_convnext_fpn, get_anchor_generator from ..base.base_detection_net import BaseDetectionNet import torch.nn.functional as F from ..base.high_reso_resnet import resnet50fpn, resnet18fpn __all__ = [ "LineDetect", "LineDetect_ResNet50_FPN_Weights", "linedetect_resnet50_fpn", ] def _default_anchorgen(): anchor_sizes = ((32,), (64,), (128,), (256,), (512,)) aspect_ratios = ((0.5, 1.0, 2.0),) * len(anchor_sizes) return AnchorGenerator(anchor_sizes, aspect_ratios) class LineDetect(BaseDetectionNet): def __init__( self, backbone, num_classes=None, # transform parameters min_size=512, max_size=1333, image_mean=None, image_std=None, # RPN parameters rpn_anchor_generator=None, rpn_head=None, rpn_pre_nms_top_n_train=2000, rpn_pre_nms_top_n_test=1000, rpn_post_nms_top_n_train=2000, rpn_post_nms_top_n_test=1000, rpn_nms_thresh=0.7, rpn_fg_iou_thresh=0.7, rpn_bg_iou_thresh=0.3, rpn_batch_size_per_image=256, rpn_positive_fraction=0.5, rpn_score_thresh=0.0, # Box parameters box_roi_pool=None, box_head=None, box_predictor=None, box_score_thresh=0.05, box_nms_thresh=0.5, box_detections_per_img=100, box_fg_iou_thresh=0.5, box_bg_iou_thresh=0.5, box_batch_size_per_image=512, box_positive_fraction=0.25, bbox_reg_weights=None, # keypoint parameters line_roi_pool=None, line_head=None, line_predictor=None, num_keypoints=None, **kwargs, ): out_channels = backbone.out_channels if rpn_anchor_generator is None: rpn_anchor_generator = _default_anchorgen() if rpn_head is None: rpn_head = RPNHead(out_channels, rpn_anchor_generator.num_anchors_per_location()[0]) rpn_pre_nms_top_n = dict(training=rpn_pre_nms_top_n_train, testing=rpn_pre_nms_top_n_test) rpn_post_nms_top_n = dict(training=rpn_post_nms_top_n_train, testing=rpn_post_nms_top_n_test) rpn = RegionProposalNetwork( rpn_anchor_generator, rpn_head, rpn_fg_iou_thresh, rpn_bg_iou_thresh, rpn_batch_size_per_image, rpn_positive_fraction, rpn_pre_nms_top_n, rpn_post_nms_top_n, rpn_nms_thresh, score_thresh=rpn_score_thresh, ) if box_roi_pool is None: box_roi_pool = MultiScaleRoIAlign(featmap_names=["0", "1", "2", "3"], output_size=7, sampling_ratio=2) if box_head is None: resolution = box_roi_pool.output_size[0] representation_size = 1024 box_head = TwoMLPHead(out_channels * resolution**2, representation_size) if box_predictor is None: representation_size = 1024 box_predictor = ObjectionPredictor(representation_size, num_classes) roi_heads = RoIHeads( # Box box_roi_pool, box_head, box_predictor, box_fg_iou_thresh, box_bg_iou_thresh, box_batch_size_per_image, box_positive_fraction, bbox_reg_weights, box_score_thresh, box_nms_thresh, box_detections_per_img, ) if image_mean is None: image_mean = [0.485, 0.456, 0.406] if image_std is None: image_std = [0.229, 0.224, 0.225] transform = GeneralizedRCNNTransform(min_size, max_size, image_mean, image_std, **kwargs) super().__init__(backbone, rpn, roi_heads, transform) if not isinstance(line_roi_pool, (MultiScaleRoIAlign, type(None))): raise TypeError( "keypoint_roi_pool should be of type MultiScaleRoIAlign or None instead of {type(keypoint_roi_pool)}" ) if min_size is None: min_size = (640, 672, 704, 736, 768, 800) if num_keypoints is not None: if line_predictor is not None: raise ValueError("num_keypoints should be None when keypoint_predictor is specified") else: num_keypoints = 2 if line_roi_pool is None: line_roi_pool = MultiScaleRoIAlign(featmap_names=["0", "1", "2", "3"], output_size=14, sampling_ratio=2) if line_head is None: keypoint_layers = tuple(512 for _ in range(8)) line_head = LineHeads(out_channels, keypoint_layers) if line_predictor is None: keypoint_dim_reduced = 512 # == keypoint_layers[-1] line_predictor = LinePredictor(keypoint_dim_reduced, num_keypoints) self.roi_heads.keypoint_roi_pool = line_roi_pool self.roi_heads.keypoint_head = line_head self.roi_heads.keypoint_predictor = line_predictor def start_train(self, cfg): # cfg = read_yaml(cfg) self.trainer = Trainer() self.trainer.train_from_cfg(model=self, cfg=cfg) def load_weights(self, save_path, device='cuda'): if os.path.exists(save_path): checkpoint = torch.load(save_path, map_location=device) self.load_state_dict(checkpoint['model_state_dict']) # if optimizer is not None: # optimizer.load_state_dict(checkpoint['optimizer_state_dict']) # epoch = checkpoint['epoch'] # loss = checkpoint['loss'] # print(f"Loaded best model from {save_path} at epoch {epoch} with loss {loss:.4f}") print(f"Loaded model from {save_path}") else: print(f"No saved model found at {save_path}") return self class TwoMLPHead(nn.Module): """ Standard heads for FPN-based models Args: in_channels (int): number of input channels representation_size (int): size of the intermediate representation """ def __init__(self, in_channels, representation_size): super().__init__() self.fc6 = nn.Linear(in_channels, representation_size) self.fc7 = nn.Linear(representation_size, representation_size) def forward(self, x): x = x.flatten(start_dim=1) x = F.relu(self.fc6(x)) x = F.relu(self.fc7(x)) return x class ObjectionConvFCHead(nn.Sequential): def __init__( self, input_size: Tuple[int, int, int], conv_layers: List[int], fc_layers: List[int], norm_layer: Optional[Callable[..., nn.Module]] = None, ): """ Args: input_size (Tuple[int, int, int]): the input size in CHW format. conv_layers (list): feature dimensions of each Convolution layer fc_layers (list): feature dimensions of each FCN layer norm_layer (callable, optional): Module specifying the normalization layer to use. Default: None """ in_channels, in_height, in_width = input_size blocks = [] previous_channels = in_channels for current_channels in conv_layers: blocks.append(misc_nn_ops.Conv2dNormActivation(previous_channels, current_channels, norm_layer=norm_layer)) previous_channels = current_channels blocks.append(nn.Flatten()) previous_channels = previous_channels * in_height * in_width for current_channels in fc_layers: blocks.append(nn.Linear(previous_channels, current_channels)) blocks.append(nn.ReLU(inplace=True)) previous_channels = current_channels super().__init__(*blocks) for layer in self.modules(): if isinstance(layer, nn.Conv2d): nn.init.kaiming_normal_(layer.weight, mode="fan_out", nonlinearity="relu") if layer.bias is not None: nn.init.zeros_(layer.bias) class ObjectionPredictor(nn.Module): """ Standard classification + bounding box regression layers for Fast R-CNN. Args: in_channels (int): number of input channels num_classes (int): number of output classes (including background) """ def __init__(self, in_channels, num_classes): super().__init__() self.cls_score = nn.Linear(in_channels, num_classes) self.bbox_pred = nn.Linear(in_channels, num_classes * 4) def forward(self, x): if x.dim() == 4: torch._assert( list(x.shape[2:]) == [1, 1], f"x has the wrong shape, expecting the last two dimensions to be [1,1] instead of {list(x.shape[2:])}", ) x = x.flatten(start_dim=1) scores = self.cls_score(x) bbox_deltas = self.bbox_pred(x) return scores, bbox_deltas class LineHeads(nn.Sequential): def __init__(self, in_channels, layers): d = [] next_feature = in_channels for out_channels in layers: d.append(nn.Conv2d(next_feature, out_channels, 3, stride=1, padding=1)) d.append(nn.ReLU(inplace=True)) next_feature = out_channels super().__init__(*d) for m in self.children(): if isinstance(m, nn.Conv2d): nn.init.kaiming_normal_(m.weight, mode="fan_out", nonlinearity="relu") nn.init.constant_(m.bias, 0) class LinePredictor(nn.Module): def __init__(self, in_channels, num_keypoints): super().__init__() input_features = in_channels deconv_kernel = 4 self.kps_score_lowres = nn.ConvTranspose2d( input_features, num_keypoints, deconv_kernel, stride=2, padding=deconv_kernel // 2 - 1, ) nn.init.kaiming_normal_(self.kps_score_lowres.weight, mode="fan_out", nonlinearity="relu") nn.init.constant_(self.kps_score_lowres.bias, 0) self.up_scale = 2 self.out_channels = num_keypoints def forward(self, x): x = self.kps_score_lowres(x) return torch.nn.functional.interpolate( x, scale_factor=float(self.up_scale), mode="bilinear", align_corners=False, recompute_scale_factor=False ) _COMMON_META = { "categories": _COCO_PERSON_CATEGORIES, "keypoint_names": _COCO_PERSON_KEYPOINT_NAMES, "min_size": (1, 1), } class LineDetect_ResNet50_FPN_Weights(WeightsEnum): COCO_LEGACY = Weights( url="https://download.pytorch.org/models/keypointrcnn_resnet50_fpn_coco-9f466800.pth", transforms=ObjectDetection, meta={ **_COMMON_META, "num_params": 59137258, "recipe": "https://github.com/pytorch/vision/issues/1606", "_metrics": { "COCO-val2017": { "box_map": 50.6, "kp_map": 61.1, } }, "_ops": 133.924, "_file_size": 226.054, "_docs": """ These weights were produced by following a similar training recipe as on the paper but use a checkpoint from an early epoch. """, }, ) COCO_V1 = Weights( url="https://download.pytorch.org/models/keypointrcnn_resnet50_fpn_coco-fc266e95.pth", transforms=ObjectDetection, meta={ **_COMMON_META, "num_params": 59137258, "recipe": "https://github.com/pytorch/vision/tree/main/references/detection#keypoint-r-cnn", "_metrics": { "COCO-val2017": { "box_map": 54.6, "kp_map": 65.0, } }, "_ops": 137.42, "_file_size": 226.054, "_docs": """These weights were produced by following a similar training recipe as on the paper.""", }, ) DEFAULT = COCO_V1 @register_model() @handle_legacy_interface( weights=( "pretrained", lambda kwargs: LineDetect_ResNet50_FPN_Weights.COCO_LEGACY if kwargs["pretrained"] == "legacy" else LineDetect_ResNet50_FPN_Weights.COCO_V1, ), weights_backbone=("pretrained_backbone", ResNet50_Weights.IMAGENET1K_V1), ) def lineDetect_resnet18_fpn( *, weights: Optional[LineDetect_ResNet50_FPN_Weights] = None, progress: bool = True, num_classes: Optional[int] = None, num_keypoints: Optional[int] = None, weights_backbone: Optional[ResNet50_Weights] = ResNet50_Weights.IMAGENET1K_V1, trainable_backbone_layers: Optional[int] = None, **kwargs: Any, ) -> LineDetect: """ Constructs a Keypoint R-CNN model with a ResNet-50-FPN backbone. .. betastatus:: detection module Reference: `Mask R-CNN `__. The input to the model is expected to be a list of tensors, each of shape ``[C, H, W]``, one for each image, and should be in ``0-1`` range. Different images can have different sizes. The behavior of the model changes depending on if it is in training or evaluation mode. During training, the model expects both the input tensors and targets (list of dictionary), containing: - boxes (``FloatTensor[N, 4]``): the ground-truth boxes in ``[x1, y1, x2, y2]`` format, with ``0 <= x1 < x2 <= W`` and ``0 <= y1 < y2 <= H``. - labels (``Int64Tensor[N]``): the class label for each ground-truth box - keypoints (``FloatTensor[N, K, 3]``): the ``K`` keypoints location for each of the ``N`` instances, in the format ``[x, y, visibility]``, where ``visibility=0`` means that the keypoint is not visible. The model returns a ``Dict[Tensor]`` during training, containing the classification and regression losses for both the RPN and the R-CNN, and the keypoint loss. During inference, the model requires only the input tensors, and returns the post-processed predictions as a ``List[Dict[Tensor]]``, one for each input image. The fields of the ``Dict`` are as follows, where ``N`` is the number of detected instances: - boxes (``FloatTensor[N, 4]``): the predicted boxes in ``[x1, y1, x2, y2]`` format, with ``0 <= x1 < x2 <= W`` and ``0 <= y1 < y2 <= H``. - labels (``Int64Tensor[N]``): the predicted labels for each instance - scores (``Tensor[N]``): the scores or each instance - keypoints (``FloatTensor[N, K, 3]``): the locations of the predicted keypoints, in ``[x, y, v]`` format. For more details on the output, you may refer to :ref:`instance_seg_output`. Keypoint R-CNN is exportable to ONNX for a fixed batch size with inputs images of fixed size. Example:: >>> model = torchvision.models.detection.keypointrcnn_resnet50_fpn(weights=KeypointRCNN_ResNet50_FPN_Weights.DEFAULT) >>> model.eval() >>> x = [torch.rand(3, 300, 400), torch.rand(3, 500, 400)] >>> predictions = model(x) >>> >>> # optionally, if you want to export the model to ONNX: >>> torch.onnx.export(model, x, "keypoint_rcnn.onnx", opset_version = 11) Args: weights (:class:`~torchvision.models.detection.KeypointRCNN_ResNet50_FPN_Weights`, optional): The pretrained weights to use. See :class:`~torchvision.models.detection.KeypointRCNN_ResNet50_FPN_Weights` below for more details, and possible values. By default, no pre-trained weights are used. progress (bool): If True, displays a progress bar of the download to stderr num_classes (int, optional): number of output classes of the model (including the background) num_keypoints (int, optional): number of keypoints weights_backbone (:class:`~torchvision.models.ResNet50_Weights`, optional): The pretrained weights for the backbone. trainable_backbone_layers (int, optional): number of trainable (not frozen) layers starting from final block. Valid values are between 0 and 5, with 5 meaning all backbone layers are trainable. If ``None`` is passed (the default) this value is set to 3. .. autoclass:: torchvision.models.detection.KeypointRCNN_ResNet50_FPN_Weights :members: """ weights = LineDetect_ResNet50_FPN_Weights.verify(weights) weights_backbone = ResNet50_Weights.verify(weights_backbone) # if weights_backbone is None: weights_backbone = ResNet18_Weights.IMAGENET1K_V1 if weights is not None: # weights_backbone = None num_classes = _ovewrite_value_param("num_classes", num_classes, len(weights.meta["categories"])) num_keypoints = _ovewrite_value_param("num_keypoints", num_keypoints, len(weights.meta["keypoint_names"])) else: if num_classes is None: num_classes = 2 if num_keypoints is None: num_keypoints = 2 is_trained = weights is not None or weights_backbone is not None trainable_backbone_layers = _validate_trainable_layers(is_trained, trainable_backbone_layers, 5, 3) norm_layer = misc_nn_ops.FrozenBatchNorm2d if is_trained else nn.BatchNorm2d backbone = resnet18(weights=weights_backbone, progress=progress, norm_layer=norm_layer) backbone = _resnet_fpn_extractor(backbone, trainable_backbone_layers) model = LineDetect(backbone, num_classes, num_keypoints=num_keypoints, **kwargs) if weights is not None: model.load_state_dict(weights.get_state_dict(progress=progress, check_hash=True)) if weights == LineDetect_ResNet50_FPN_Weights.COCO_V1: overwrite_eps(model, 0.0) return model def linedetect_resnet50_fpn( *, weights: Optional[LineDetect_ResNet50_FPN_Weights] = None, progress: bool = True, num_classes: Optional[int] = None, num_keypoints: Optional[int] = None, weights_backbone: Optional[ResNet50_Weights] = ResNet50_Weights.IMAGENET1K_V1, trainable_backbone_layers: Optional[int] = None, **kwargs: Any, ) -> LineDetect: """ Constructs a Keypoint R-CNN model with a ResNet-50-FPN backbone. .. betastatus:: detection module Reference: `Mask R-CNN `__. The input to the model is expected to be a list of tensors, each of shape ``[C, H, W]``, one for each image, and should be in ``0-1`` range. Different images can have different sizes. The behavior of the model changes depending on if it is in training or evaluation mode. During training, the model expects both the input tensors and targets (list of dictionary), containing: - boxes (``FloatTensor[N, 4]``): the ground-truth boxes in ``[x1, y1, x2, y2]`` format, with ``0 <= x1 < x2 <= W`` and ``0 <= y1 < y2 <= H``. - labels (``Int64Tensor[N]``): the class label for each ground-truth box - keypoints (``FloatTensor[N, K, 3]``): the ``K`` keypoints location for each of the ``N`` instances, in the format ``[x, y, visibility]``, where ``visibility=0`` means that the keypoint is not visible. The model returns a ``Dict[Tensor]`` during training, containing the classification and regression losses for both the RPN and the R-CNN, and the keypoint loss. During inference, the model requires only the input tensors, and returns the post-processed predictions as a ``List[Dict[Tensor]]``, one for each input image. The fields of the ``Dict`` are as follows, where ``N`` is the number of detected instances: - boxes (``FloatTensor[N, 4]``): the predicted boxes in ``[x1, y1, x2, y2]`` format, with ``0 <= x1 < x2 <= W`` and ``0 <= y1 < y2 <= H``. - labels (``Int64Tensor[N]``): the predicted labels for each instance - scores (``Tensor[N]``): the scores or each instance - keypoints (``FloatTensor[N, K, 3]``): the locations of the predicted keypoints, in ``[x, y, v]`` format. For more details on the output, you may refer to :ref:`instance_seg_output`. Keypoint R-CNN is exportable to ONNX for a fixed batch size with inputs images of fixed size. Example:: >>> model = torchvision.models.detection.linedetect_resnet50_fpn(weights=LineDetect_ResNet50_FPN_Weights.DEFAULT) >>> model.eval() >>> x = [torch.rand(3, 300, 400), torch.rand(3, 500, 400)] >>> predictions = model(x) >>> >>> # optionally, if you want to export the model to ONNX: >>> torch.onnx.export(model, x, "keypoint_rcnn.onnx", opset_version = 11) Args: weights (:class:`~torchvision.models.detection.KeypointRCNN_ResNet50_FPN_Weights`, optional): The pretrained weights to use. See :class:`~torchvision.models.detection.KeypointRCNN_ResNet50_FPN_Weights` below for more details, and possible values. By default, no pre-trained weights are used. progress (bool): If True, displays a progress bar of the download to stderr num_classes (int, optional): number of output classes of the model (including the background) num_keypoints (int, optional): number of keypoints weights_backbone (:class:`~torchvision.models.ResNet50_Weights`, optional): The pretrained weights for the backbone. trainable_backbone_layers (int, optional): number of trainable (not frozen) layers starting from final block. Valid values are between 0 and 5, with 5 meaning all backbone layers are trainable. If ``None`` is passed (the default) this value is set to 3. .. autoclass:: torchvision.models.detection.KeypointRCNN_ResNet50_FPN_Weights :members: """ weights = LineDetect_ResNet50_FPN_Weights.verify(weights) weights_backbone = ResNet50_Weights.verify(weights_backbone) if weights is not None: weights_backbone = None num_classes = _ovewrite_value_param("num_classes", num_classes, len(weights.meta["categories"])) num_keypoints = _ovewrite_value_param("num_keypoints", num_keypoints, len(weights.meta["keypoint_names"])) else: if num_classes is None: num_classes = 2 if num_keypoints is None: num_keypoints = 17 is_trained = weights is not None or weights_backbone is not None trainable_backbone_layers = _validate_trainable_layers(is_trained, trainable_backbone_layers, 5, 3) norm_layer = misc_nn_ops.FrozenBatchNorm2d if is_trained else nn.BatchNorm2d backbone = resnet50(weights=weights_backbone, progress=progress, norm_layer=norm_layer) backbone = _resnet_fpn_extractor(backbone, trainable_backbone_layers) model = LineDetect(backbone, num_classes, num_keypoints=num_keypoints, **kwargs) if weights is not None: model.load_state_dict(weights.get_state_dict(progress=progress, check_hash=True)) if weights == LineDetect_ResNet50_FPN_Weights.COCO_V1: overwrite_eps(model, 0.0) return model