#! /usr/bin/env python3 import rospy import os import hanqiangpose import command import subprocess import hjsx import threading import numpy as np import multiprocessing from moveit_msgs.msg import PlanningScene from dynamic_reconfigure.client import Client from rospy.exceptions import ROSException from std_srvs.srv import Empty from rospy.service import ServiceException import actionlib_msgs.msg._GoalStatusArray import sys import time def wait_for_topic(topic_name, message_type): try: message = rospy.wait_for_message(topic_name, message_type, timeout=None) return message except rospy.ROSException as e: rospy.logerr(f"等待话题 {topic_name} 时程序被中断: {e}") return None def clear_octomap(): try: clear_octomap_service = rospy.ServiceProxy('/clear_octomap', Empty) # 创建服务代理 clear_octomap_service() # 尝试调用服务 rospy.loginfo("Octomap has been cleared.") except ServiceException as e: rospy.loginfo("Failed to call /clear_octomap service or it is not available. Skipping operation.") waited_once = False tim_list=[] if __name__ == "__main__": command.load_rviz() rospy.init_node("set_update_paramter_p") # 初始化各种类型参数 rospy.set_param("cishu",0) rospy.set_param("time",0) rospy.set_param("sign_control",1) rospy.set_param("sign_pointcloud",0) rospy.set_param("folder_path","/home/tong/moveir_ws/data/123") #rospy.set_param("folder_path","/home/tong/moveit_ws/data/renyuan_2") rospy.loginfo("等待rviz启动") wait_for_topic("/execute_trajectory/status",actionlib_msgs.msg.GoalStatusArray) while not rospy.is_shutdown(): sign_control = str(rospy.get_param("sign_control")) if sign_control == "0": if not waited_once: #print("等待点云数据准备完成...") sys.stdin.flush()#清空键盘缓冲区 aaa=input("请选择 1-2 默认为2: ") if aaa=="": aaa='2' #aaa='2' rospy.set_param("cishu",rospy.get_param("cishu")+1) if(aaa =='1' or aaa=='2'): rospy.set_param("sign_control",aaa) else: command.close_rviz("/usr/bin/python3 /opt/ros/noetic/bin/roslaunch robot_config demo.launch") exit(0) waited_once = True elif sign_control == "1": # 重置参数 #rospy.set_param("sign_control",0) #在轨迹规划里重置 防止重复调用 #rospy.set_param("sign_traj_accepted",0) # 清除场景 clear_octomap() #点云计算并发布 process = multiprocessing.Process(target=command.launch_publish_pointcloud_background) process.start() input("请在rviz中查看点云,然后按回车键继续...") #计算焊接顺序和焊接姿态 hjsx.run() hanqiangpose.run() time.sleep(10) command.load_visual() # 等待 /move_group/monitored_planning_scene 话题发布消息 # rospy.loginfo("等待场景地图加载完毕...") # wait_for_topic('/move_group/monitored_planning_scene', PlanningScene) # rospy.loginfo(f"场景地图已加载完毕") rospy.set_param("sign_pointcloud",1) #rospy.loginfo("终止点云发布,关闭发布窗口") rospy.loginfo("运行moveitserver2.py") process = multiprocessing.Process(target=command.launch_moveit_control_server_background) process.start() while rospy.get_param("sign_control"): pass #运行结束,重置参数 waited_once = False elif sign_control == "2": tim=time.time() rospy.set_param("time",0) # 重置参数 #rospy.set_param("sign_control",0) #rospy.set_param("sign_traj_accepted",0) hjsx.run() hanqiangpose.run() command.load_visual() process = multiprocessing.Process(target=command.launch_moveit_control_server_background) process.start() while rospy.get_param("sign_control"): pass # tim=time.time()-tim # tim_list.append(tim) # print("-------------------------------------") # print(f'第{rospy.get_param("cishu")}次规划:') # print(tim_list) # print("") #运行结束,重置参数 waited_once = False