speed_estimation.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. # Ultralytics 🚀 AGPL-3.0 License - https://ultralytics.com/license
  2. from time import time
  3. import numpy as np
  4. from ultralytics.solutions.solutions import BaseSolution
  5. from ultralytics.utils.plotting import Annotator, colors
  6. class SpeedEstimator(BaseSolution):
  7. """
  8. A class to estimate the speed of objects in a real-time video stream based on their tracks.
  9. This class extends the BaseSolution class and provides functionality for estimating object speeds using
  10. tracking data in video streams.
  11. Attributes:
  12. spd (Dict[int, float]): Dictionary storing speed data for tracked objects.
  13. trkd_ids (List[int]): List of tracked object IDs that have already been speed-estimated.
  14. trk_pt (Dict[int, float]): Dictionary storing previous timestamps for tracked objects.
  15. trk_pp (Dict[int, Tuple[float, float]]): Dictionary storing previous positions for tracked objects.
  16. annotator (Annotator): Annotator object for drawing on images.
  17. region (List[Tuple[int, int]]): List of points defining the speed estimation region.
  18. track_line (List[Tuple[float, float]]): List of points representing the object's track.
  19. r_s (LineString): LineString object representing the speed estimation region.
  20. Methods:
  21. initialize_region: Initializes the speed estimation region.
  22. estimate_speed: Estimates the speed of objects based on tracking data.
  23. store_tracking_history: Stores the tracking history for an object.
  24. extract_tracks: Extracts tracks from the current frame.
  25. display_output: Displays the output with annotations.
  26. Examples:
  27. >>> estimator = SpeedEstimator()
  28. >>> frame = cv2.imread("frame.jpg")
  29. >>> processed_frame = estimator.estimate_speed(frame)
  30. >>> cv2.imshow("Speed Estimation", processed_frame)
  31. """
  32. def __init__(self, **kwargs):
  33. """Initializes the SpeedEstimator object with speed estimation parameters and data structures."""
  34. super().__init__(**kwargs)
  35. self.initialize_region() # Initialize speed region
  36. self.spd = {} # set for speed data
  37. self.trkd_ids = [] # list for already speed_estimated and tracked ID's
  38. self.trk_pt = {} # set for tracks previous time
  39. self.trk_pp = {} # set for tracks previous point
  40. def estimate_speed(self, im0):
  41. """
  42. Estimates the speed of objects based on tracking data.
  43. Args:
  44. im0 (np.ndarray): Input image for processing. Shape is typically (H, W, C) for RGB images.
  45. Returns:
  46. (np.ndarray): Processed image with speed estimations and annotations.
  47. Examples:
  48. >>> estimator = SpeedEstimator()
  49. >>> image = np.random.randint(0, 255, (480, 640, 3), dtype=np.uint8)
  50. >>> processed_image = estimator.estimate_speed(image)
  51. """
  52. self.annotator = Annotator(im0, line_width=self.line_width) # Initialize annotator
  53. self.extract_tracks(im0) # Extract tracks
  54. self.annotator.draw_region(
  55. reg_pts=self.region, color=(104, 0, 123), thickness=self.line_width * 2
  56. ) # Draw region
  57. for box, track_id, cls in zip(self.boxes, self.track_ids, self.clss):
  58. self.store_tracking_history(track_id, box) # Store track history
  59. # Check if track_id is already in self.trk_pp or trk_pt initialize if not
  60. if track_id not in self.trk_pt:
  61. self.trk_pt[track_id] = 0
  62. if track_id not in self.trk_pp:
  63. self.trk_pp[track_id] = self.track_line[-1]
  64. speed_label = f"{int(self.spd[track_id])} km/h" if track_id in self.spd else self.names[int(cls)]
  65. self.annotator.box_label(box, label=speed_label, color=colors(track_id, True)) # Draw bounding box
  66. # Draw tracks of objects
  67. self.annotator.draw_centroid_and_tracks(
  68. self.track_line, color=colors(int(track_id), True), track_thickness=self.line_width
  69. )
  70. # Calculate object speed and direction based on region intersection
  71. if self.LineString([self.trk_pp[track_id], self.track_line[-1]]).intersects(self.r_s):
  72. direction = "known"
  73. else:
  74. direction = "unknown"
  75. # Perform speed calculation and tracking updates if direction is valid
  76. if direction == "known" and track_id not in self.trkd_ids:
  77. self.trkd_ids.append(track_id)
  78. time_difference = time() - self.trk_pt[track_id]
  79. if time_difference > 0:
  80. self.spd[track_id] = np.abs(self.track_line[-1][1] - self.trk_pp[track_id][1]) / time_difference
  81. self.trk_pt[track_id] = time()
  82. self.trk_pp[track_id] = self.track_line[-1]
  83. self.display_output(im0) # display output with base class function
  84. return im0 # return output image for more usage