# 导入基本ros和moveit库 from logging.handlers import RotatingFileHandler import os import moveit_msgs.msg import rospy, sys import moveit_commander import moveit_msgs from moveit_commander import MoveGroupCommander, PlanningSceneInterface, RobotCommander from moveit_msgs.msg import PlanningScene, ObjectColor,CollisionObject, AttachedCollisionObject,RobotTrajectory from moveit_msgs.msg import RobotState from moveit_msgs.msg import Constraints, PositionConstraint, OrientationConstraint from shape_msgs.msg import SolidPrimitive from sensor_msgs.msg import JointState import tf.transformations from trajectory_msgs.msg import JointTrajectory,JointTrajectoryPoint from geometry_msgs.msg import PoseStamped, Pose,Point,Vector3 from copy import deepcopy import numpy as np import tf from tf.transformations import euler_from_quaternion,quaternion_from_euler,quaternion_from_euler, quaternion_multiply import math import traceback from lstrobot_planning.msg import JointTrajectoryPoint_ex,JointTrajectory_ex import json import termios pi = np.pi class MoveIt_Control: # 初始化程序 def __init__(self, is_use_gripper=False): self.home_pose=[] self.hf_num=0#焊缝计数 从0开始 self.hf_fail=[] # Init ros config moveit_commander.roscpp_initialize(sys.argv) self.arm = moveit_commander.MoveGroupCommander('manipulator') self.arm.set_goal_joint_tolerance(0.0001) self.arm.set_goal_position_tolerance(0.0005) self.arm.set_goal_orientation_tolerance(0.1) #self.arm.set_planner_id("RRTConnet")#不知道能不能用 self.end_effector_link = self.arm.get_end_effector_link() # 设置机械臂基座的参考系 self.reference_frame = 'base_link' self.arm.set_pose_reference_frame(self.reference_frame) # # 设置最大规划时间和是否允许重新规划 self.arm.set_planning_time(10.0) self.arm.allow_replanning(True) self.arm.set_max_acceleration_scaling_factor(1) self.arm.set_max_velocity_scaling_factor(1) self.arm.set_num_planning_attempts(10) # 机械臂初始姿态 #self.move_j() self.go_home() self.home_pose=self.get_now_pose() # 关节规划,输入6个关节角度(单位:弧度) def move_j(self, joint_configuration=None,a=1,v=1): # 设置机械臂的目标位置,使用六轴的位置数据进行描述(单位:弧度) if joint_configuration==None: joint_configuration = [0, 0, 0, 0, 0, 0] self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) self.arm.set_joint_value_target(joint_configuration) rospy.loginfo("move_j:"+str(joint_configuration)) self.arm.go() #rospy.sleep(1) def get_now_pose(self): # 获取机械臂当前位姿 current_pose = self.arm.get_current_pose(self.end_effector_link).pose pose=[] pose.append(current_pose.position.x) pose.append(current_pose.position.y) pose.append(current_pose.position.z) pose.append(current_pose.orientation.x) pose.append(current_pose.orientation.y) pose.append(current_pose.orientation.z) pose.append(current_pose.orientation.w) # 打印位姿信息 #rospy.lohome_poseinfo("Current Pose: {}".format(pose)) return pose def create_path_constraints2(self,start_point,end_point,r):#创建路径约束 #计算起点指向终点的向量 vector = np.array([end_point[0]- start_point[0], end_point[1]- start_point[1], end_point[2]- start_point[2]]) height = np.linalg.norm(vector)+0.16 #高度延长16cm radius = r # 位置约束 position_constraint = PositionConstraint()#创建点位约束 position_constraint.header.frame_id = "base_link"#此约束所指的机器人链接 position_constraint.link_name = self.end_effector_link position_constraint.target_point_offset = Vector3(0, 0, 0)#目标点的偏移量 position_constraint.weight = 1.0#质量 OR 加权因子? #构建 shape_msgs/SolidPrimitive 消息 bounding_volume = SolidPrimitive() bounding_volume.type = SolidPrimitive.CYLINDER#圆柱 # bounding_volume.dimensions = [0.003,1] bounding_volume.dimensions = [height,radius]#尺寸 高度 半径 position_constraint.constraint_region.primitives.append(bounding_volume) #构建 geometry_msgs/Pose 消息,用于指定圆柱体在空间中的位置和姿态 pose = Pose() pose.position.x = start_point[0] + vector[0] / 2#定义位置(找了个几何中心) pose.position.y = start_point[1] + vector[1] / 2 pose.position.z = start_point[2] + vector[2] / 2 # 计算圆柱体的姿态 z_axis = np.array([0, 0, 1]) axis = np.cross(z_axis, vector) #计算两个向量(向量数组)的叉乘 叉乘返回的数组既垂直于a,又垂直于b angle = np.arccos(np.dot(z_axis, vector) / np.linalg.norm(vector))#反余弦求角度 q = tf.transformations.quaternion_about_axis(angle, axis)#通过旋转轴和旋转角返回四元数 pose.orientation.x = q[0] pose.orientation.y = q[1] pose.orientation.z = q[2] pose.orientation.w = q[3] position_constraint.constraint_region.primitive_poses.append(pose) constraints = Constraints()#创建一个新的约束信息 constraints.position_constraints.append(position_constraint) # constraints.orientation_constraints.append(orientation_constraint) return constraints def move_p_flexible(self,point,points,trajectory,trajectory_with_type,v=1,a=1): self.arm.set_max_velocity_scaling_factor(v) self.arm.set_max_acceleration_scaling_factor(a) rrr=0.05#初始允许半径 if trajectory: #起点位置设置为规划组最后一个点 state = self.arm.get_current_state() state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions path_constraints = self.create_path_constraints2(points[-1],point,rrr)#将当前的点和输入的点(焊缝的起始点)做圆柱轨迹约束 else: #刚开始规划 起点位置设定为当前状态 按理来说是home点 self.go_home() self.home_pose=self.get_now_pose() state = self.arm.get_current_state() path_constraints = self.create_path_constraints2(self.home_pose,point,rrr)#将当前的点和输入的点(焊缝的起始点)做圆柱轨迹约束 self.arm.set_path_constraints(path_constraints)#设定约束 self.arm.set_pose_target(point, self.end_effector_link)#设定目标点为传入的点 self.arm.set_start_state(state)#起点位置设置为规划组最后一个点 或者当前状态(第一个点时) #尝试规划次数 b = 10 #尝试规划5次 无约束5次 for i in range(b): traj = self.arm.plan()#调用plan进行规划 error=traj[3] if error.val == moveit_msgs.msg._MoveItErrorCodes.MoveItErrorCodes.SUCCESS: rospy.loginfo("本次移动OK") trajj = traj[1] #取出规划的信息 trajj_with_type = mark_the_traj(trajj,"during-p",welding_sequence) trajj_with_type.points[-len(trajj.joint_trajectory.points)].type = "start" trajj_with_type.points[-1].type = "end" trajectory_with_type.append(trajj_with_type) points.append(point) trajectory.append(trajj) break else: rospy.loginfo(error) rospy.loginfo("移动规划失败-开始第{}次重新规划".format(i+1)) self.arm.clear_path_constraints() rrr=rrr+0.2#每次增加10厘米 rospy.loginfo("R值:{}".format(rrr)) if trajectory: path_constraints = self.create_path_constraints2(points[-1],point,rrr)#将最后一个点和输入的点(焊缝的起始点)做圆柱轨迹约束 else: path_constraints = self.create_path_constraints2(self.home_pose,point,rrr)#将最后一个点和起点(焊缝的起始点)做圆柱轨迹约束 self.arm.set_path_constraints(path_constraints)#设定约束 if(i>=(b-5)): rospy.loginfo("所有移动规划尝试失败 取消路径约束") self.arm.clear_path_constraints() if(i==(b-1)): rospy.loginfo("所有移动规划尝试失败 程序退出") rospy.set_param("sign_control",0) sys.exit(0) #清除约束 self.arm.clear_path_constraints() return points,trajectory,trajectory_with_type def move_pl(self,point,points,trajectory,trajectory_with_type,v=1,a=1):#焊缝的规划 只规划当前的点到上一个点 self.arm.set_max_velocity_scaling_factor(v) self.arm.set_max_acceleration_scaling_factor(a) #设定约束 if trajectory: #起点位置设置为规划组最后一个点 state = self.arm.get_current_state() state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions self.arm.set_start_state(state) self.arm.set_pose_target(point, self.end_effector_link)#设定目标点为传入的点 # 创建路径约束 path_constraints = self.create_path_constraints(points[-1],point)#将最后一个点和输入的点(焊缝的起始点)做圆柱轨迹约束 self.arm.set_path_constraints(path_constraints)#设定约束 traj = self.arm.plan()#调用plan进行规划 error=traj[3] if error.val == moveit_msgs.msg._MoveItErrorCodes.MoveItErrorCodes.SUCCESS: rospy.loginfo("本次焊缝规划 OK") trajj = traj[1] #取出规划的信息 trajj_with_type = mark_the_traj(trajj,"during-l",welding_sequence) trajj_with_type.points[-len(trajj.joint_trajectory.points)].type = "start" trajj_with_type.points[-1].type = "end" points.append(point) trajectory.append(trajj) trajectory_with_type.append(trajj_with_type) else: rospy.loginfo("本次焊缝规划失败") points.pop()#将本条焊缝的起点从规划中删除 trajectory.pop() trajectory_with_type.pop() self.hf_fail.append(welding_sequence[self.hf_num]) #清除约束 self.arm.clear_path_constraints() self.hf_num=self.hf_num+1#焊缝计数 return points,trajectory,trajectory_with_type def create_path_constraints(self,start_point,end_point): #计算起点指向终点的向量 vector = np.array([end_point[0]- start_point[0], end_point[1]- start_point[1], end_point[2]- start_point[2]]) height = np.linalg.norm(vector)+0.002 radius = 0.001 constraints = Constraints() # 位置约束 position_constraint = PositionConstraint() position_constraint.header.frame_id = "base_link" position_constraint.link_name = self.end_effector_link position_constraint.target_point_offset = Vector3(0, 0, 0) #构建 shape_msgs/SolidPrimitive 消息 bounding_volume = SolidPrimitive() bounding_volume.type = SolidPrimitive.CYLINDER # bounding_volume.dimensions = [0.003,1] bounding_volume.dimensions = [height,radius] #构建 geometry_msgs/Pose 消息,用于指定圆柱体在空间中的位置和姿态 pose = Pose() pose.position.x = start_point[0] + vector[0] / 2 pose.position.y = start_point[1] + vector[1] / 2 pose.position.z = start_point[2] + vector[2] / 2 # 计算圆柱体的旋转矩阵 z_axis = np.array([0, 0, 1]) axis = np.cross(z_axis, vector) angle = np.arccos(np.dot(z_axis, vector) / np.linalg.norm(vector)) q = tf.transformations.quaternion_about_axis(angle, axis) pose.orientation.x = q[0] pose.orientation.y = q[1] pose.orientation.z = q[2] pose.orientation.w = q[3] position_constraint.constraint_region.primitives.append(bounding_volume) position_constraint.constraint_region.primitive_poses.append(pose) position_constraint.weight = 1.0 constraints.position_constraints.append(position_constraint) return constraints def go_home(self,a=1,v=1): rospy.loginfo("go_home start") self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) self.arm.set_named_target('home') rospy.loginfo("get_home end") self.arm.go() rospy.loginfo("go_home end") # rospy.sleep(1) def go_home_justplan(self,trajectory,trajectory_with_type,a=1,v=1): self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) state = self.arm.get_current_state() state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions self.arm.set_start_state(state) self.arm.set_named_target('home') traj = self.arm.plan() trajj = traj[1] traj_with_type = mark_the_traj(trajj,"go-home",welding_sequence) trajectory.append(trajj) trajectory_with_type.append(traj_with_type) return trajectory,trajectory_with_type def path_planning(self,folder_path,gohome=True): file_path_result = os.path.join(folder_path, 'result.txt') all_data = process_welding_data(file_path_result) points,trajectory,trajectory_with_type = [],[],[] for i in range(len(all_data)): rospy.loginfo("共读取到%d条焊缝,开始规划第%d条",len(all_data),i+1) start_point = all_data[i][0] end_point = all_data[i][1] q1 = all_data[i][2] q2 = all_data[i][3] point11 = [start_point[0]/1000, start_point[1]/1000, start_point[2]/1000, q1[0],q1[1],q1[2],q1[3]] points,trajectory,trajectory_with_type = self.move_p_flexible(point11,points,trajectory,trajectory_with_type) point22 = [end_point[0]/1000, end_point[1]/1000, end_point[2]/1000, q2[0],q2[1],q2[2],q2[3]] points,trajectory,trajectory_with_type = self.move_pl(point22,points,trajectory,trajectory_with_type, v=v, a=a) rospy.loginfo("第%d条焊缝规划完毕",i+1) if gohome: points,trajectory,trajectory_with_type = self.move_p_flexible(self.home_pose,points,trajectory,trajectory_with_type) trajectory,trajectory_with_type = self.go_home_justplan(trajectory,trajectory_with_type) traj_merge= merge_trajectories(trajectory) trajectory_with_type_merge= merge_trajectories_with_type(trajectory_with_type) rospy.loginfo("All of The trajectory Plan successfully") return trajectory,traj_merge,trajectory_with_type_merge ###########################################################class end############################################################# def process_welding_data(filename): all_data = [] # 用来存储每一行处理后的数据 with open(filename, 'r') as file: for line in file: parts = line.strip().split('/') coordinates_and_poses = [part.split(',') for part in parts[1:]] start_point = tuple(map(float, coordinates_and_poses[0][:3])) # 使用元组以避免修改 end_point = tuple(map(float, coordinates_and_poses[1][:3])) q1 = tuple(map(float, coordinates_and_poses[2][:4])) q2 = tuple(map(float, coordinates_and_poses[3][:4])) # 收集每行处理后的数据 all_data.append((start_point, end_point, q1, q2)) return all_data def mark_the_traj(traj,TYPE,SEQUENCE): traj_with_type = JointTrajectory_ex() traj_with_type.header = traj.joint_trajectory.header traj_with_type.joint_names = traj.joint_trajectory.joint_names traj_with_type.points = [ JointTrajectoryPoint_ex( positions=point.positions, velocities=point.velocities, accelerations=point.accelerations, effort=point.effort, type=TYPE, sequence=SEQUENCE ) for point in traj.joint_trajectory.points ] return traj_with_type def merge_trajectories(trajectories): if not trajectories: return None # 创建一个新的 JointTrajectory 对象 merged_trajectory = JointTrajectory() merged_trajectory.header = trajectories[0].joint_trajectory.header merged_trajectory.joint_names = trajectories[0].joint_trajectory.joint_names # 初始化时间累加器 last_time_from_start = rospy.Duration(0) # 合并所有 trajectories 的 points for traj in trajectories: for point in traj.joint_trajectory.points: # 创建新的轨迹点 new_point = deepcopy(point) # 累加时间 new_point.time_from_start += last_time_from_start merged_trajectory.points.append(new_point) # 更新时间累加器为当前轨迹的最后一个点的时间 if traj.joint_trajectory.points: last_time_from_start = traj.joint_trajectory.points[-1].time_from_start + last_time_from_start return merged_trajectory def merge_trajectories_with_type(trajectory_with_type): if not trajectory_with_type: return None # 创建一个新的 JointTrajectory 对象 merged_trajectory_with_type = JointTrajectory_ex() merged_trajectory_with_type.header = trajectory_with_type[0].header merged_trajectory_with_type.joint_names = trajectory_with_type[0].joint_names # 初始化时间累加器 last_time_from_start = rospy.Duration(0) # 合并所有 trajectories 的 points for traj in trajectory_with_type: for point in traj.points: # 创建新的轨迹点 new_point = deepcopy(point) # 累加时间 new_point.time_from_start += last_time_from_start merged_trajectory_with_type.points.append(new_point) # 更新时间累加器为当前轨迹的最后一个点的时间 if traj.points: last_time_from_start = traj.points[-1].time_from_start + last_time_from_start return merged_trajectory_with_type def pub_trajectories(trajectory_with_type_merge): pub = rospy.Publisher("/joint_path", JointTrajectory_ex, queue_size=5) count = 1 rate = rospy.Rate(1) rospy.loginfo("正在发布轨迹信息..") while not rospy.is_shutdown(): #轨迹接受完毕,关闭当前发布 sign_traj_accepted = str(rospy.get_param("sign_traj_accepted")) if sign_traj_accepted == "1": rospy.set_param("sign_traj_accepted",0) break pub.publish(trajectory_with_type_merge) # rospy.loginfo("发布计数,count = %d",count) # count += 1 rate.sleep() class py_msgs(): fial=[] shun_xv=[] class point(): xyz_list=[] type=[] def ROS2PY_msgs(msgs, m_sr): for i in range(len(msgs.points)): py_msgs.point.xyz_list.append(msgs.points[i].positions) py_msgs.point.type.append(msgs.points[i].type) py_msgs.fail = m_sr.hf_fail.copy() py_msgs.shun_xv = msgs.points[0].sequence.copy() # for i in range(len(py_msgs.point.type)): # print(py_msgs.point.xyz_list[i]) # print(py_msgs.point.type[i]) message = { 'positions': py_msgs.point.xyz_list, # 规划路径点结果 'flags': py_msgs.point.type, # 规划路径点的类型,焊接点移动点 'weld_order': py_msgs.shun_xv, # 规划路径顺序 'failed': py_msgs.fail, # 规划路径失败的焊缝 } return json.dumps(message) # for i in range(len(py_msgs.point.type)): # moveit_server.move_j(py_msgs.point.xyz_list[i]) # #input("任意键继续") def get_user_input(): """获取用户输入的速度和加速度缩放因子,并在控制台中打印信息和错误提示""" DEFUALT_V = 1.0 DEFUALT_A = 1.0 def get_valid_input(factor, default): while True: user_input=input(factor) if user_input=="": return default try: value=float(user_input) if 0<=value<=1: return value else: rospy.logerr("缩放因子必须在[0, 1]范围内,请重新输入!") except ValueError: rospy.logerr("输入值无效,请根据提示重新输入!") try: termios.tcflush(sys.stdin, termios.TCIFLUSH) #清空输入缓存 rospy.loginfo("输入缓存区已清空!") v=get_valid_input("请输入速度缩放因子(0-1, 默认为1): ", DEFUALT_V) a=get_valid_input("请输入加速度缩放因子(0-1, 默认为1): ", DEFUALT_A) rospy.loginfo(f"用户输入的速度缩放因子为{v}, 加速度缩放因子为{a}") return v, a except Exception as e: rospy.logerr(f"发生错误:{e}") if __name__ =="__main__": # from redis_publisher import Publisher # publisher = Publisher() folder_path = rospy.get_param("folder_path") rospy.init_node('moveit_control_server', anonymous=False) moveit_server = MoveIt_Control(is_use_gripper=False) welding_sequence = rospy.get_param('welding_sequence') v,a=get_user_input() trajectory,trajectory_merge,trajectory_with_type_merge = moveit_server.path_planning(folder_path) # plan_result = ROS2PY_msgs(trajectory_with_type_merge, moveit_server) # publisher.pub_plan_result(plan_result) moveit_server.arm.execute(trajectory_merge) rospy.set_param("sign_control",0)