#!/usr/bin/env python3 """ FIXED: 1. 正确更新 PyVista 点云(使用 mesh.copy_from) 2. 安全退出(避免卡死) 3. 兼容 Fast-LIVO 的 /cloud_registered_rgb """ import sys import rclpy from rclpy.node import Node from sensor_msgs.msg import PointCloud2 from sensor_msgs_py import point_cloud2 import numpy as np import pyvista as pv from PySide2.QtWidgets import ( QApplication, QMainWindow, QVBoxLayout, QWidget, QLabel, QPushButton ) from PySide2.QtCore import QThread, Signal, Slot, QTimer from pyvistaqt import QtInteractor class SLAMViewerNode(Node): def __init__(self, callback): super().__init__('slam_qt_viewer_node') self.callback = callback self.subscription = self.create_subscription( PointCloud2, '/cloud_registered_rgb', self.listener_callback, 10 ) self.get_logger().info("Subscribed to /cloud_registered_rgb") def listener_callback(self, msg): points = [] colors = [] for pt in point_cloud2.read_points(msg, field_names=('x', 'y', 'z', 'rgb'), skip_nans=True): x, y, z = pt[0], pt[1], pt[2] rgb_packed = pt[3] if np.isnan(rgb_packed): r, g, b = 255, 255, 255 else: rgb_uint = np.float32(rgb_packed).view('uint32') r = (rgb_uint >> 16) & 0xFF g = (rgb_uint >> 8) & 0xFF b = (rgb_uint >> 0) & 0xFF points.append([x, y, z]) colors.append([r, g, b]) if points: pts = np.array(points, dtype=np.float32) cols = np.array(colors, dtype=np.uint8) self.callback(pts, cols) class ROS2SubscriberThread(QThread): new_cloud = Signal(np.ndarray, np.ndarray) def __init__(self): super().__init__() self.node = None self._running = True def run(self): try: rclpy.init() self.node = SLAMViewerNode(self.new_cloud.emit) while self._running and rclpy.ok(): rclpy.spin_once(self.node, timeout_sec=0.1) # 非阻塞,可响应退出 except KeyboardInterrupt: pass finally: if rclpy.ok(): rclpy.shutdown() def stop(self): self._running = False if self.node: self.node.destroy_node() self.quit() self.wait(2000) # 最多等2秒 class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle("SLAM Viewer - /cloud_registered_rgb (FIXED)") self.setGeometry(100, 100, 1000, 700) central_widget = QWidget() self.setCentralWidget(central_widget) layout = QVBoxLayout(central_widget) self.plotter = QtInteractor(self) layout.addWidget(self.plotter) self.status_label = QLabel("Waiting for /cloud_registered_rgb...") layout.addWidget(self.status_label) exit_btn = QPushButton("Exit") exit_btn.clicked.connect(self.close) layout.addWidget(exit_btn) # 初始化一个空点云(必须有有效数据才能显示) initial_pts = np.array([[0.0, 0.0, 0.0]], dtype=np.float32) initial_cols = np.array([[0, 0, 0]], dtype=np.uint8) self.cloud = pv.PolyData(initial_pts) self.cloud.point_data['colors'] = initial_cols self.actor = self.plotter.add_points( self.cloud, scalars='colors', rgb=True, point_size=2.0, render_points_as_spheres=False ) self.plotter.add_axes() self.plotter.show_grid(color='white') self.plotter.background_color = 'black' self.last_update_time = 0.0 self.update_interval = 1.0 self.latest_data = None self.ros_thread = ROS2SubscriberThread() self.ros_thread.new_cloud.connect(self.on_new_cloud_received) self.ros_thread.start() self.timer = QTimer() self.timer.timeout.connect(self.try_update_plot) self.timer.start(100) @Slot(np.ndarray, np.ndarray) def on_new_cloud_received(self, points, colors): self.latest_data = (points, colors) def try_update_plot(self): import time current_time = time.time() if self.latest_data is not None and (current_time - self.last_update_time) >= self.update_interval: pts, cols = self.latest_data n_pts = pts.shape[0] # ✅ 关键修复:创建新 PolyData 并替换 new_cloud = pv.PolyData(pts) new_cloud.point_data['colors'] = cols # 替换 actor 的输入 self.actor.GetMapper().SetInputData(new_cloud) self.plotter.render() self.status_label.setText(f"Updated: {n_pts:,} points | Topic: /cloud_registered_rgb") self.last_update_time = current_time self.latest_data = None def closeEvent(self, event): # 先停定时器 self.timer.stop() # 停止 ROS 线程(关键!) self.ros_thread.stop() # 再关闭渲染器 try: self.plotter.close() except Exception: pass event.accept() def main(): app = QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec_()) if __name__ == '__main__': main()