import rclpy from rclpy.node import Node from sensor_msgs.msg import PointCloud2 import numpy as np import open3d as o3d from pyvistaqt import BackgroundPlotter import pyvista as pv import queue import threading class LidarViewer(Node): def __init__(self, data_queue): super().__init__('lidar_pyvista_viewer') self.data_queue = data_queue self.subscription = self.create_subscription( PointCloud2, '/rs_lidar/points', # 替换为你的 topic self.pointcloud_callback, 10 ) self.get_logger().info('等待点云数据...') def pointcloud_callback(self, msg): # 这里只做快速解析,不进行可视化操作! points = self.pointcloud2_to_array(msg) if points is not None: # 将点云放入队列(线程安全) self.data_queue.put(points) self.get_logger().info(f'显示 {len(points)} 个点') def pointcloud2_to_array(self, msg): # 简化版:假设是 XYZ 格式(根据你的实际格式调整) try: data = np.frombuffer(msg.data, dtype=np.float32) points = data.reshape(-1, 4)[:, :3] # 取 x, y, z return points except Exception as e: self.get_logger().error(f'解析失败: {e}') return None def main(): rclpy.init() data_queue = queue.Queue() node = LidarViewer(data_queue) # 启动 ROS 2 spin 线程(非阻塞) ros_thread = threading.Thread(target=rclpy.spin, args=(node,), daemon=True) ros_thread.start() # 初始化 PyVista 可视化(主线程) plotter = BackgroundPlotter() plotter.add_axes() point_cloud_actor = None try: while rclpy.ok(): # 从队列取最新点云(非阻塞) try: points = data_queue.get_nowait() cloud = pv.PolyData(points) # 如果之前有点云 actor,先移除它 if point_cloud_actor is not None: plotter.remove_actor(point_cloud_actor) # 添加新的点云并获取新的 actor point_cloud_actor = plotter.add_points( cloud, render_points_as_spheres=False, point_size=2 ) # 自动缩放到包含所有点云的范围 plotter.reset_camera() except queue.Empty: pass # 让 PyVista 处理 GUI 事件(关键!) plotter.app.processEvents() # 短暂休眠避免 CPU 占用过高 import time time.sleep(0.01) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown() plotter.close() if __name__ == '__main__': main()