| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- #!/usr/bin/env python3
- """
- FIXED:
- 1. 正确更新 PyVista 点云(使用 mesh.copy_from)
- 2. 安全退出(避免卡死)
- 3. 兼容 Fast-LIVO 的 /cloud_registered_rgb
- """
- import sys
- import rclpy
- from rclpy.node import Node
- from sensor_msgs.msg import PointCloud2
- from sensor_msgs_py import point_cloud2
- import numpy as np
- import pyvista as pv
- from PySide2.QtWidgets import (
- QApplication, QMainWindow, QVBoxLayout, QWidget, QLabel, QPushButton
- )
- from PySide2.QtCore import QThread, Signal, Slot, QTimer
- from pyvistaqt import QtInteractor
- class SLAMViewerNode(Node):
- def __init__(self, callback):
- super().__init__('slam_qt_viewer_node')
- self.callback = callback
- self.subscription = self.create_subscription(
- PointCloud2,
- '/cloud_registered_rgb',
- self.listener_callback,
- 10
- )
- self.get_logger().info("Subscribed to /cloud_registered_rgb")
- def listener_callback(self, msg):
- points = []
- colors = []
- for pt in point_cloud2.read_points(msg, field_names=('x', 'y', 'z', 'rgb'), skip_nans=True):
- x, y, z = pt[0], pt[1], pt[2]
- rgb_packed = pt[3]
- if np.isnan(rgb_packed):
- r, g, b = 255, 255, 255
- else:
- rgb_uint = np.float32(rgb_packed).view('uint32')
- r = (rgb_uint >> 16) & 0xFF
- g = (rgb_uint >> 8) & 0xFF
- b = (rgb_uint >> 0) & 0xFF
- points.append([x, y, z])
- colors.append([r, g, b])
- if points:
- pts = np.array(points, dtype=np.float32)
- cols = np.array(colors, dtype=np.uint8)
- self.callback(pts, cols)
- class ROS2SubscriberThread(QThread):
- new_cloud = Signal(np.ndarray, np.ndarray)
- def __init__(self):
- super().__init__()
- self.node = None
- self._running = True
- def run(self):
- try:
- rclpy.init()
- self.node = SLAMViewerNode(self.new_cloud.emit)
- while self._running and rclpy.ok():
- rclpy.spin_once(self.node, timeout_sec=0.1) # 非阻塞,可响应退出
- except KeyboardInterrupt:
- pass
- finally:
- if rclpy.ok():
- rclpy.shutdown()
- def stop(self):
- self._running = False
- if self.node:
- self.node.destroy_node()
- self.quit()
- self.wait(2000) # 最多等2秒
- class MainWindow(QMainWindow):
- def __init__(self):
- super().__init__()
- self.setWindowTitle("SLAM Viewer - /cloud_registered_rgb (FIXED)")
- self.setGeometry(100, 100, 1000, 700)
- central_widget = QWidget()
- self.setCentralWidget(central_widget)
- layout = QVBoxLayout(central_widget)
- self.plotter = QtInteractor(self)
- layout.addWidget(self.plotter)
- self.status_label = QLabel("Waiting for /cloud_registered_rgb...")
- layout.addWidget(self.status_label)
- exit_btn = QPushButton("Exit")
- exit_btn.clicked.connect(self.close)
- layout.addWidget(exit_btn)
- # 初始化一个空点云(必须有有效数据才能显示)
- initial_pts = np.array([[0.0, 0.0, 0.0]], dtype=np.float32)
- initial_cols = np.array([[0, 0, 0]], dtype=np.uint8)
- self.cloud = pv.PolyData(initial_pts)
- self.cloud.point_data['colors'] = initial_cols
- self.actor = self.plotter.add_points(
- self.cloud,
- scalars='colors',
- rgb=True,
- point_size=2.0,
- render_points_as_spheres=False
- )
- self.plotter.add_axes()
- self.plotter.show_grid(color='white')
- self.plotter.background_color = 'black'
- self.last_update_time = 0.0
- self.update_interval = 1.0
- self.latest_data = None
- self.ros_thread = ROS2SubscriberThread()
- self.ros_thread.new_cloud.connect(self.on_new_cloud_received)
- self.ros_thread.start()
- self.timer = QTimer()
- self.timer.timeout.connect(self.try_update_plot)
- self.timer.start(100)
- @Slot(np.ndarray, np.ndarray)
- def on_new_cloud_received(self, points, colors):
- self.latest_data = (points, colors)
- def try_update_plot(self):
- import time
- current_time = time.time()
- if self.latest_data is not None and (current_time - self.last_update_time) >= self.update_interval:
- pts, cols = self.latest_data
- n_pts = pts.shape[0]
- # ✅ 关键修复:创建新 PolyData 并替换
- new_cloud = pv.PolyData(pts)
- new_cloud.point_data['colors'] = cols
- # 替换 actor 的输入
- self.actor.GetMapper().SetInputData(new_cloud)
- self.plotter.render()
- self.status_label.setText(f"Updated: {n_pts:,} points | Topic: /cloud_registered_rgb")
- self.last_update_time = current_time
- self.latest_data = None
- def closeEvent(self, event):
- # 先停定时器
- self.timer.stop()
- # 停止 ROS 线程(关键!)
- self.ros_thread.stop()
- # 再关闭渲染器
- try:
- self.plotter.close()
- except Exception:
- pass
- event.accept()
- def main():
- app = QApplication(sys.argv)
- window = MainWindow()
- window.show()
- sys.exit(app.exec_())
- if __name__ == '__main__':
- main()
|