#!/usr/bin/env python3 # -*- coding: utf-8 -*- # 导入基本ros和moveit库 from logging.handlers import RotatingFileHandler import os import rospy, sys import moveit_commander import moveit_msgs from moveit_commander import MoveGroupCommander, PlanningSceneInterface, RobotCommander from moveit_msgs.msg import PlanningScene, ObjectColor,CollisionObject, AttachedCollisionObject,RobotTrajectory from moveit_msgs.msg import RobotState from moveit_msgs.msg import Constraints, PositionConstraint, OrientationConstraint from shape_msgs.msg import SolidPrimitive from sensor_msgs.msg import JointState import tf.transformations from trajectory_msgs.msg import JointTrajectory,JointTrajectoryPoint from geometry_msgs.msg import PoseStamped, Pose,Point,Vector3 from copy import deepcopy import numpy as np import tf from tf.transformations import euler_from_quaternion,quaternion_from_euler,quaternion_from_euler, quaternion_multiply import math import traceback import check from lstrobot_planning.msg import JointTrajectoryPoint_ex,JointTrajectory_ex pi = np.pi class MoveIt_Control: # 初始化程序 def __init__(self, is_use_gripper=False): self.home_pose=[] # Init ros config moveit_commander.roscpp_initialize(sys.argv) # 初始化ROS节点 rospy.init_node('moveit_control_server', anonymous=False) self.arm = moveit_commander.MoveGroupCommander('manipulator') self.arm.set_goal_joint_tolerance(0.0001) self.arm.set_goal_position_tolerance(0.0005) self.arm.set_goal_orientation_tolerance(0.1) self.end_effector_link = self.arm.get_end_effector_link() # 设置机械臂基座的参考系 self.reference_frame = 'base_link' self.arm.set_pose_reference_frame(self.reference_frame) # # 设置最大规划时间和是否允许重新规划 self.arm.set_planning_time(10) self.arm.allow_replanning(True) planner_id = rospy.get_param("planner_id") # self.arm.set_planner_id(planner_id) # self.arm.set_planner_id("RRTstar") # self.arm.set_planner_id("SBL") # self.arm.set_planner_id("BKPIECE")#速度快,比较稳定 # self.arm.set_planner_id("BiEST")#还不错 # 设置允许的最大速度和加速度(范围:0~1) robot_params = { 'velocity_limits': True, 'joint_max_velocity': [3.541, 3.541, 3.733, 6.838, 4.815, 23.655], 'acceleration_limits': False, 'joint_max_acceleration': [0, 0, 0, 0, 0, 0] } self.arm.set_max_acceleration_scaling_factor(1) self.arm.set_max_velocity_scaling_factor(1) # 机械臂初始姿态 self.go_home() home_pose=self.get_now_pose() def get_now_pose(self): # 获取机械臂当前位姿 current_pose = self.arm.get_current_pose(self.end_effector_link).pose pose=[] pose.append(current_pose.position.x) pose.append(current_pose.position.y) pose.append(current_pose.position.z) pose.append(current_pose.orientation.x) pose.append(current_pose.orientation.y) pose.append(current_pose.orientation.z) pose.append(current_pose.orientation.w) # 打印位姿信息 #rospy.lohome_poseinfo("Current Pose: {}".format(pose)) return pose def close(self): moveit_commander.roscpp_shutdown() moveit_commander.os._exit(0) def create_path_constraints2(self,start_point,end_point,r):#创建路径约束 #计算起点指向终点的向量 vector = np.array([end_point[0]- start_point[0], end_point[1]- start_point[1], end_point[2]- start_point[2]]) height = np.linalg.norm(vector)+0.006 #计算向量或矩阵的范数 用于度量向量或矩阵的大小或长度 radius = r # 位置约束 position_constraint = PositionConstraint()#创建点位约束 position_constraint.header.frame_id = "base_link"#此约束所指的机器人链接 position_constraint.link_name = self.end_effector_link position_constraint.target_point_offset = Vector3(0, 0, 0)#目标点的偏移量 position_constraint.weight = 1.0#质量 OR 加权因子? #构建 shape_msgs/SolidPrimitive 消息 bounding_volume = SolidPrimitive() bounding_volume.type = SolidPrimitive.CYLINDER#圆柱 # bounding_volume.dimensions = [0.003,1] bounding_volume.dimensions = [height,radius]#尺寸 高度 半径 position_constraint.constraint_region.primitives.append(bounding_volume) #构建 geometry_msgs/Pose 消息,用于指定圆柱体在空间中的位置和姿态 pose = Pose() pose.position.x = start_point[0] + vector[0] / 2#定义位置(找了个几何中心) pose.position.y = start_point[1] + vector[1] / 2 pose.position.z = start_point[2] + vector[2] / 2 # 计算圆柱体的姿态 z_axis = np.array([0, 0, 1]) axis = np.cross(z_axis, vector) #计算两个向量(向量数组)的叉乘 叉乘返回的数组既垂直于a,又垂直于b angle = np.arccos(np.dot(z_axis, vector) / np.linalg.norm(vector))#反余弦求角度 q = tf.transformations.quaternion_about_axis(angle, axis)#通过旋转轴和旋转角返回四元数 pose.orientation.x = q[0] pose.orientation.y = q[1] pose.orientation.z = q[2] pose.orientation.w = q[3] position_constraint.constraint_region.primitive_poses.append(pose) constraints = Constraints()#创建一个新的约束信息 constraints.position_constraints.append(position_constraint) # constraints.orientation_constraints.append(orientation_constraint) return constraints def move_p_flexible(self,point,points,trajectory,trajectory_with_type,a=1,v=1): self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) rrr=0.05#初始允许半径 if trajectory: #起点位置设置为规划组最后一个点 state = self.arm.get_current_state() state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions # 创建路径约束 path_constraints = self.create_path_constraints2(points[-1],point,rrr)#将最后一个点和输入的点(焊缝的起始点)做圆柱轨迹约束 self.arm.set_path_constraints(path_constraints)#设定约束 else: #起点位置设定为当前状态 按理来说是home点 self.go_home() self.home_pose=self.get_now_pose() #rospy.loginfo(self.home_pose) state = self.arm.get_current_state() # 创建路径约束 path_constraints = self.create_path_constraints2(self.home_pose,point,rrr)#将当前的点和输入的点(焊缝的起始点)做圆柱轨迹约束 self.arm.set_path_constraints(path_constraints)#设定约束 self.arm.set_pose_target(point, self.end_effector_link)#设定目标点为传入的点 self.arm.set_start_state(state)#起点位置设置为规划组最后一个点 或者当前状态(第一个点时) #当前起点规划次数 #尝试规划次数 b = 16 #尝试规划10次 for i in range(b): traj = self.arm.plan()#调用plan进行规划 trajj = traj[1] #取出规划的信息 error,Limit_margin = check.check_joint_positions(trajj)#检查关节状态 就是检测规划的轨迹有没有“摇花手” if not error:#如果没有摇花手 接受规划 rospy.loginfo("本次移动 OK") break else: rospy.loginfo("移动规划失败-开始第{}次重新规划".format(i+1)) if i%2==0 and i!=0:#10次了都没成功 重新修改约束 self.arm.clear_path_constraints() rrr=rrr+0.1#每次增加10厘米 path_constraints = self.create_path_constraints2(self.home_pose,point,rrr)#将最后一个点和输入的点(焊缝的起始点)做圆柱轨迹约束 self.arm.set_path_constraints(path_constraints)#设定约束 #规划失败 if error: rospy.loginfo("{}次移动规划失败,进程终止".format(b)) sys.exit() #清除约束 self.arm.clear_path_constraints() trajj_with_type = mark_the_traj(trajj,"during-l",welding_sequence) trajj_with_type.points[-len(trajj.joint_trajectory.points)].type = "start" trajj_with_type.points[-1].type = "end" trajectory_with_type.append(trajj_with_type) points.append(point) trajectory.append(trajj) return points,trajectory,trajectory_with_type # 测试程序用 def testRobot(self): try: rospy.loginfo("Test for robot...") file_path_result = os.path.join(folder_path, 'result.txt') all_data = process_welding_data(file_path_result) points,trajectory,trajectory_with_type = [],[],[] for i in range(len(all_data)): rospy.loginfo("共读取到%d条焊缝,开始规划第%d条",len(all_data),i+1) start_point = all_data[i][0] end_point = all_data[i][1] q1 = all_data[i][2] q2 = all_data[i][3] point11 = [start_point[0]/1000, start_point[1]/1000, start_point[2]/1000, q1[0],q1[1],q1[2],q1[3]] self.move_p(point11) point22 = [end_point[0]/1000, end_point[1]/1000, end_point[2]/1000, q2[0],q2[1],q2[2],q2[3]] self.move_p(point22) rospy.loginfo("第%d条焊缝规划完毕",i+1) # joint_position = [x * math.pi / 180 for x in [11.016, 3.716, -19.553, -5.375, -32.4173, 109.158]] # self.move_j(joint_position) # self.show_current_pose() print("Test OK") rospy.sleep(1) except Exception as e: print("Test fail! Exception:", str(e)) traceback.print_exc() # except: # print("Test fail! ") def show_current_pose(self): # 获取机械臂当前位姿 current_pose = self.arm.get_current_pose(self.end_effector_link).pose pose=[] pose.append(current_pose.position.x) pose.append(current_pose.position.y) pose.append(current_pose.position.z) pose.append(current_pose.orientation.x) pose.append(current_pose.orientation.y) pose.append(current_pose.orientation.z) # 打印位姿信息 rospy.loginfo("Current Pose: {}".format(pose)) def set_scene(self): rospy.loginfo("set scene") ## set table self.scene = PlanningSceneInterface() self.scene_pub = rospy.Publisher('planning_scene', PlanningScene, queue_size=5) self.colors = dict() # rospy.sleep(1) table_id = 'table' self.scene.remove_world_object(table_id) rospy.sleep(1) table_size = [2, 2, 0.01] table_pose = PoseStamped() table_pose.header.frame_id = self.reference_frame table_pose.pose.position.x = 0.0 table_pose.pose.position.y = 0.0 table_pose.pose.position.z = -table_size[2]/2 table_pose.pose.orientation.w = 1.0 box2_id = 'box2' self.scene.remove_world_object(box2_id) rospy.sleep(1) box2_size = [0.01, 0.6, 0.4] box2_pose = PoseStamped() box2_pose.header.frame_id = self.reference_frame box2_pose.pose.position.x = 1.15194-box2_size[0]/2-0.005 box2_pose.pose.position.y = -0.707656 box2_pose.pose.position.z = 0.4869005 box2_pose.pose.orientation.w = 1.0 box3_id = 'box3' self.scene.remove_world_object(box3_id) rospy.sleep(1) box3_size = [0.2, 0.01, 0.4] box3_pose = PoseStamped() box3_pose.header.frame_id = self.reference_frame box3_pose.pose.position.x = 1.15194+box3_size[0]/2 box3_pose.pose.position.y = -0.707656-box3_size[1]/2-0.005 box3_pose.pose.position.z = 0.4869005 box3_pose.pose.orientation.w = 1.0 # self.scene.add_box(table_id, table_pose, table_size) # self.scene.add_box(box2_id, box2_pose, box2_size) # self.scene.add_box(box3_id, box3_pose, box3_size) self.setColor(table_id, 0.5, 0.5, 0.5, 1.0) self.sendColors() rospy.loginfo("set scene end") def plan_cartesian_path(self,waypoints): fraction = 0.0 # 路径规划覆盖率 maxtries = 100 # 最大尝试规划次数 attempts = 0 # 已经尝试规划次数 # 尝试规划一条笛卡尔空间下的路径,依次通过所有路点 while fraction < 1.0 and attempts < maxtries: (plan, fraction) = self.arm.compute_cartesian_path( waypoints, # waypoint poses,路点列表 0.001, # eef_step,终端步进值 0.0, # jump_threshold,跳跃阈值 True) # avoid_collisions,避障规划 attempts += 1 if fraction == 1.0: rospy.loginfo("Path computed successfully.") traj = plan else: rospy.loginfo( "Path planning failed with only " + str(fraction) + " success after " + str(maxtries) + " attempts.") return fraction,plan # 关节规划,输入6个关节角度(单位:弧度) def move_j(self, joint_configuration=None,a=1,v=1): # 设置机械臂的目标位置,使用六轴的位置数据进行描述(单位:弧度) if joint_configuration==None: joint_configuration = [0, -1.5707, 0, -1.5707, 0, 0] self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) self.arm.set_joint_value_target(joint_configuration) rospy.loginfo("move_j:"+str(joint_configuration)) self.arm.go() rospy.sleep(1) # 空间规划,输入xyzRPY def move_p(self, tool_configuration=None,a=1,v=1): if tool_configuration==None: tool_configuration = [0.3,0,0.3,0,-np.pi/2,0] self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) target_pose = PoseStamped() target_pose.header.frame_id = 'base_link' target_pose.header.stamp = rospy.Time.now() target_pose.pose.position.x = tool_configuration[0] target_pose.pose.position.y = tool_configuration[1] target_pose.pose.position.z = tool_configuration[2] if len(tool_configuration) == 6: q = quaternion_from_euler(tool_configuration[3],tool_configuration[4],tool_configuration[5]) target_pose.pose.orientation.x = q[0] target_pose.pose.orientation.y = q[1] target_pose.pose.orientation.z = q[2] target_pose.pose.orientation.w = q[3] else: target_pose.pose.orientation.x = tool_configuration[3] target_pose.pose.orientation.y = tool_configuration[4] target_pose.pose.orientation.z = tool_configuration[5] target_pose.pose.orientation.w = tool_configuration[6] self.arm.set_start_state_to_current_state() self.arm.set_pose_target(target_pose, self.end_effector_link) rospy.loginfo("move_p:" + str(tool_configuration)) traj = self.arm.plan() path = traj[1] # file_path = "/home/chen/catkin_ws/src/lstrobot_planning/date/p_traj.txt" # if os.path.exists(file_path): # os.remove(file_path) # with open(file_path, "w") as file: # file.write(str(path)) self.arm.execute(path) # rospy.sleep(1) def move_p2(self,point,points,trajectory,trajectory_with_type,a=1,v=1): self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) # self.arm.set_start_state_to_current_state() if trajectory: state = self.arm.get_current_state() state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions self.arm.set_start_state(state) self.arm.set_pose_target(point, self.end_effector_link) rospy.loginfo("move_p:" + str(point)) traj = self.arm.plan() trajj = traj[1] error_code =traj[3] rospy.loginfo("Move to start point planned successfully") trajj_with_type = mark_the_traj(trajj,"during-p",welding_sequence) points.append(point) trajectory.append(trajj) trajectory_with_type.append(trajj_with_type) return points,trajectory,trajectory_with_type def move_pl(self,point,points,trajectory,trajectory_with_type,a=1,v=1): self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) #设定约束 if trajectory: #起点位置设置为规划组最后一个点 state = self.arm.get_current_state() state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions self.arm.set_start_state(state)#起点位置设置为规划组最后一个点 self.arm.set_pose_target(point, self.end_effector_link)#设定目标点为传入的点 # 创建路径约束 path_constraints = self.create_path_constraints(points[-1],point)#将最后一个点和输入的点(焊缝的起始点)做圆柱轨迹约束 self.arm.set_path_constraints(path_constraints)#设定约束 #当前起点规划次数 #尝试规划次数 b = 20 #最多尝试规划10次 for i in range(b): traj = self.arm.plan()#调用plan进行规划 trajj = traj[1] #取出规划的信息 error,Limit_margin = check.check_joint_positions(trajj)#检查关节状态 就是检测规划的轨迹有没有“摇花手” if not error:#如果没有摇花手 接受规划 rospy.loginfo("本次焊缝规划 OK") break else: rospy.loginfo("焊缝规划失败-开始第{}次重新规划".format(i+1)) # if i%10==0 and i!=0:#10次了都没成功 将这个点用p2规划 # prePoint = points.pop()#移除列表中最后一个元素 返回被移除的对象 # trajectory.pop() # trajectory_with_type.pop() # #清除约束 # self.arm.clear_path_constraints() # points,trajectory,trajectory_with_type = self.move_p2(prePoint,points,trajectory,trajectory_with_type) # rospy.loginfo("第{}次尝试使用p2".format((i/10)%1)) #规划失败 if error: rospy.loginfo("{}次焊缝规划失败,进程终止".format(b)) sys.exit() #清除约束 self.arm.clear_path_constraints() trajj_with_type = mark_the_traj(trajj,"during-l",welding_sequence) trajj_with_type.points[-len(trajj.joint_trajectory.points)].type = "start" trajj_with_type.points[-1].type = "end" #如果调用了P2函数 那么这段代码不会导致重复添加? points.append(point) trajectory.append(trajj) trajectory_with_type.append(trajj_with_type) return points,trajectory,trajectory_with_type def create_path_constraints(self,start_point,end_point): #计算起点指向终点的向量 vector = np.array([end_point[0]- start_point[0], end_point[1]- start_point[1], end_point[2]- start_point[2]]) height = np.linalg.norm(vector)+0.002 radius = 0.001 constraints = Constraints() # 位置约束 position_constraint = PositionConstraint() position_constraint.header.frame_id = "base_link" position_constraint.link_name = self.end_effector_link position_constraint.target_point_offset = Vector3(0, 0, 0) #构建 shape_msgs/SolidPrimitive 消息 bounding_volume = SolidPrimitive() bounding_volume.type = SolidPrimitive.CYLINDER # bounding_volume.dimensions = [0.003,1] bounding_volume.dimensions = [height,radius] #构建 geometry_msgs/Pose 消息,用于指定圆柱体在空间中的位置和姿态 pose = Pose() pose.position.x = start_point[0] + vector[0] / 2 pose.position.y = start_point[1] + vector[1] / 2 pose.position.z = start_point[2] + vector[2] / 2 # 计算圆柱体的旋转矩阵 z_axis = np.array([0, 0, 1]) axis = np.cross(z_axis, vector) angle = np.arccos(np.dot(z_axis, vector) / np.linalg.norm(vector)) q = tf.transformations.quaternion_about_axis(angle, axis) pose.orientation.x = q[0] pose.orientation.y = q[1] pose.orientation.z = q[2] pose.orientation.w = q[3] position_constraint.constraint_region.primitives.append(bounding_volume) position_constraint.constraint_region.primitive_poses.append(pose) position_constraint.weight = 1.0 # 姿态约束 # orientation_constraint = OrientationConstraint() # orientation_constraint.header.frame_id = "base_link" # orientation_constraint.link_name = self.end_effector_link # orientation_constraint.orientation = target_pose.orientation # orientation_constraint.absolute_x_axis_tolerance = 0.01 # orientation_constraint.absolute_y_axis_tolerance = 0.01 # orientation_constraint.absolute_z_axis_tolerance = 0.01 # orientation_constraint.weight = 1.0 constraints.position_constraints.append(position_constraint) # constraints.orientation_constraints.append(orientation_constraint) return constraints def move_l(self, tool_configuration,waypoints_number=1,a=0.5,v=0.5): if tool_configuration==None: tool_configuration = [0.3,0,0.3,0,-np.pi/2,0] self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) # 设置路点 waypoints = [] for i in range(waypoints_number): target_pose = PoseStamped() target_pose.header.frame_id = self.reference_frame target_pose.header.stamp = rospy.Time.now() target_pose.pose.position.x = tool_configuration[6*i+0] target_pose.pose.position.y = tool_configuration[6*i+1] target_pose.pose.position.z = tool_configuration[6*i+2] q = quaternion_from_euler(tool_configuration[6*i+3],tool_configuration[6*i+4],tool_configuration[6*i+5]) target_pose.pose.orientation.x = q[0] target_pose.pose.orientation.y = q[1] target_pose.pose.orientation.z = q[2] target_pose.pose.orientation.w = q[3] waypoints.append(target_pose.pose) rospy.loginfo("move_l:" + str(tool_configuration)) self.arm.set_start_state_to_current_state() fraction = 0.0 # 路径规划覆盖率 maxtries = 100 # 最大尝试规划次数 attempts = 0 # 已经尝试规划次数 # 设置机器臂当前的状态作为运动初始状态 self.arm.set_start_state_to_current_state() # 尝试规划一条笛卡尔空间下的路径,依次通过所有路点 while fraction < 1.0 and attempts < maxtries: (plan, fraction) = self.arm.compute_cartesian_path( waypoints, # waypoint poses,路点列表 0.001, # eef_step,终端步进值 0.00, # jump_threshold,跳跃阈值 True) # avoid_collisions,避障规划 attempts += 1 if fraction == 1.0: rospy.loginfo("Path computed successfully. Moving the arm.") self.arm.execute(plan) rospy.loginfo("Path execution complete.") else: rospy.loginfo( "Path planning failed with only " + str(fraction) + " success after " + str(maxtries) + " attempts.") rospy.sleep(1) def move_l22(self,point,points,trajectory,trajectory_with_type,a=0.5,v=0.5): self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) if trajectory: state = self.arm.get_current_state() state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions self.arm.set_start_state(state) start_rotation_angle,end_rotation_angle = 0,0 new_start_pose,new_end_pose = 0,0 num_planed = 0 while not start_rotation_angle == 2*pi: waypoints = [] target_pose = point_to_PoseStamped(point,self.reference_frame) waypoints.append(target_pose.pose) rospy.loginfo("move_l2:" ) num_remake = 2 for _ in range(num_remake): num_planed +=1 rospy.loginfo("第{}次规划".format(num_planed)) rospy.loginfo("new_start_pose:{} new_end_pose:{}".format(new_start_pose,new_end_pose)) fraction,traj = self.plan_cartesian_path(waypoints) if fraction == 1: error,percentage = check.check_joint_positions(traj) if not error: break if fraction != 1: break error,percentage = check.check_joint_positions(traj) if error: #三次重规划结束,仍然不符合,插点或转点 angle_list = ([0,pi/4,-pi/4,pi/2,-pi/2,pi*3/4,-pi*3/4,pi,2*pi]) if start_rotation_angle < pi: if end_rotation_angle < pi: index = angle_list.index(end_rotation_angle) end_rotation_angle = angle_list[index+1] new_end_pose = check.angle_adjustment(point,'z', start_rotation_angle) # 将目标位姿旋转一定角度 point = new_end_pose # 将新的点替换原来位姿 else: index = angle_list.index(start_rotation_angle) start_rotation_angle = angle_list[index+1] new_start_pose = check.angle_adjustment(points[-1],'z', start_rotation_angle) # 将目标位姿旋转一定角度 #go home,plan p state = self.arm.get_current_state() state.joint_state.position = trajectory[-1].joint_trajectory.points[0].positions self.arm.set_start_state(state) del points[-1] del trajectory[-1] points,trajectory,trajectory_with_type = self.move_p2(new_start_pose,points,trajectory,trajectory_with_type) state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions self.arm.set_start_state(state) end_rotation_angle = 0 else: break if fraction != 1: rospy.loginfo("Plan Fail,Error code 1") sys.exit() elif error: rospy.loginfo("Plan Fail,Error code 2") sys.exit() else: traj_with_type = mark_the_traj(traj,"during-l",welding_sequence) traj_with_type.points[-len(traj.joint_trajectory.points)].type = "start" traj_with_type.points[-1].type = "end" points.append(point) trajectory.append(traj) trajectory_with_type.append(traj_with_type) return points,trajectory,trajectory_with_type def go_home(self,a=1,v=1): rospy.loginfo("go_home start") self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) # “up”为自定义姿态,你可以使用“home”或者其他姿态 self.arm.set_named_target('home') self.arm.go() rospy.loginfo("go_home end") # rospy.sleep(1) def go_home_justplan(self,trajectory,trajectory_with_type,a=1,v=1): self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) # “up”为自定义姿态,你可以使用“home”或者其他姿态 state = self.arm.get_current_state() state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions self.arm.set_start_state(state) self.arm.set_named_target('home') traj = self.arm.plan() trajj = traj[1] traj_with_type = mark_the_traj(trajj,"go-home",welding_sequence) trajectory.append(trajj) trajectory_with_type.append(traj_with_type) return trajectory,trajectory_with_type def setColor(self, name, r, g, b, a=0.9): # 初始化moveit颜色对象 color = ObjectColor() # 设置颜色值 color.id = name color.color.r = r color.color.g = g color.color.b = b color.color.a = a # 更新颜色字典 self.colors[name] = color # 将颜色设置发送并应用到moveit场景当中 def sendColors(self): # 初始化规划场景对象 p = PlanningScene() # 需要设置规划场景是否有差异 p.is_diff = True # 从颜色字典中取出颜色设置 for color in self.colors.values(): p.object_colors.append(color) # 发布场景物体颜色设置 self.scene_pub.publish(p) def path_planning(self,folder_path,gohome=True): file_path_result = os.path.join(folder_path, 'result.txt') all_data = process_welding_data(file_path_result) points,trajectory,trajectory_with_type = [],[],[] for i in range(len(all_data)): rospy.loginfo("共读取到%d条焊缝,开始规划第%d条",len(all_data),i+1) start_point = all_data[i][0] end_point = all_data[i][1] q1 = all_data[i][2] q2 = all_data[i][3] point11 = [start_point[0]/1000, start_point[1]/1000, start_point[2]/1000, q1[0],q1[1],q1[2],q1[3]] points,trajectory,trajectory_with_type = self.move_p_flexible(point11,points,trajectory,trajectory_with_type) point22 = [end_point[0]/1000, end_point[1]/1000, end_point[2]/1000, q2[0],q2[1],q2[2],q2[3]] points,trajectory,trajectory_with_type = self.move_pl(point22,points,trajectory,trajectory_with_type) rospy.loginfo("第%d条焊缝规划完毕",i+1) if gohome: points,trajectory,trajectory_with_type = self.move_p_flexible(self.home_pose,points,trajectory,trajectory_with_type) trajectory,trajectory_with_type = self.go_home_justplan(trajectory,trajectory_with_type) traj_merge= merge_trajectories(trajectory) trajectory_with_type_merge= merge_trajectories_with_type(trajectory_with_type) rospy.loginfo("All of The trajectory Plan successfully") return trajectory,traj_merge,trajectory_with_type_merge def path_planning_test(self,folder_path,i,reverse=False,gohome=True): file_path_result = os.path.join(folder_path, 'result.txt') all_data = process_welding_data(file_path_result) points,trajectory,trajectory_with_type = [],[],[] i -= 1 rospy.loginfo("共读取到%d条焊缝,开始规划第%d条",len(all_data),i+1) start_point = all_data[i][0] end_point = all_data[i][1] q1 = all_data[i][2] q2 = all_data[i][3] if reverse: start_point = all_data[i][1] end_point = all_data[i][0] q1 = all_data[i][3] q2 = all_data[i][2] point11 = [start_point[0]/1000, start_point[1]/1000, start_point[2]/1000, q1[0],q1[1],q1[2],q1[3]] points,trajectory,trajectory_with_type = self.move_p2(point11,points,trajectory,trajectory_with_type) point22 = [end_point[0]/1000, end_point[1]/1000, end_point[2]/1000, q2[0],q2[1],q2[2],q2[3]] points,trajectory,trajectory_with_type = self.move_pl(point22,points,trajectory,trajectory_with_type) rospy.loginfo("第%d条焊缝规划完毕",i+1) if gohome: trajectory,trajectory_with_type = self.go_home_justplan(trajectory,trajectory_with_type) traj_merge= merge_trajectories(trajectory) trajectory_with_type_merge= merge_trajectories_with_type(trajectory_with_type) rospy.loginfo("All of The trajectory Plan successfully") return trajectory,traj_merge,trajectory_with_type_merge def point_to_PoseStamped(point,reference_frame): target_pose = PoseStamped() target_pose.header.frame_id = reference_frame target_pose.header.stamp = rospy.Time.now() target_pose.pose.position.x = point[0] target_pose.pose.position.y = point[1] target_pose.pose.position.z = point[2] if len(point) == 6: q = quaternion_from_euler(point[3],point[4],point[5]) target_pose.pose.orientation.x = q[0] target_pose.pose.orientation.y = q[1] target_pose.pose.orientation.z = q[2] target_pose.pose.orientation.w = q[3] else: target_pose.pose.orientation.x = point[3] target_pose.pose.orientation.y = point[4] target_pose.pose.orientation.z = point[5] target_pose.pose.orientation.w = point[6] return target_pose def calculate_waypoints(start_pose,end_pose,spacing): start_position = np.array([start_pose.pose.position.x, start_pose.pose.position.y, start_pose.pose.position.z]) start_orientation = euler_from_quaternion([start_pose.pose.orientation.x,start_pose.pose.orientation.y,start_pose.pose.orientation.z,start_pose.pose.orientation.w]) end_position = np.array([end_pose.pose.position.x, end_pose.pose.position.y, end_pose.pose.position.z]) end_orientation = euler_from_quaternion([end_pose.pose.orientation.x,end_pose.pose.orientation.y,end_pose.pose.orientation.z,end_pose.pose.orientation.w]) distance = np.linalg.norm(np.array(end_position) - np.array(start_position)) num_points = int(distance/spacing) trajectory = [] for i in range(num_points + 1): alpha = float(i) / num_points position = [start + alpha * (end - start) for start, end in zip(start_position, end_position)] orientation = [start + alpha * (end - start) for start, end in zip(start_orientation, end_orientation)] trajectory.append(position + orientation) return trajectory,num_points + 1 def process_welding_data(filename): all_data = [] # 用来存储每一行处理后的数据 with open(filename, 'r') as file: for line in file: parts = line.strip().split('/') coordinates_and_poses = [part.split(',') for part in parts[1:]] start_point = tuple(map(float, coordinates_and_poses[0][:3])) # 使用元组以避免修改 end_point = tuple(map(float, coordinates_and_poses[1][:3])) q1 = tuple(map(float, coordinates_and_poses[2][:4])) q2 = tuple(map(float, coordinates_and_poses[3][:4])) # 收集每行处理后的数据 all_data.append((start_point, end_point, q1, q2)) return all_data def mark_the_traj(traj,TYPE,SEQUENCE): traj_with_type = JointTrajectory_ex() traj_with_type.header = traj.joint_trajectory.header traj_with_type.joint_names = traj.joint_trajectory.joint_names traj_with_type.points = [ JointTrajectoryPoint_ex( positions=point.positions, velocities=point.velocities, accelerations=point.accelerations, effort=point.effort, type=TYPE, sequence=SEQUENCE ) for point in traj.joint_trajectory.points ] return traj_with_type def merge_trajectories(trajectories): if not trajectories: return None # 创建一个新的 JointTrajectory 对象 merged_trajectory = JointTrajectory() merged_trajectory.header = trajectories[0].joint_trajectory.header merged_trajectory.joint_names = trajectories[0].joint_trajectory.joint_names # 初始化时间累加器 last_time_from_start = rospy.Duration(0) # 合并所有 trajectories 的 points for traj in trajectories: for point in traj.joint_trajectory.points: # 创建新的轨迹点 new_point = deepcopy(point) # 累加时间 new_point.time_from_start += last_time_from_start merged_trajectory.points.append(new_point) # 更新时间累加器为当前轨迹的最后一个点的时间 if traj.joint_trajectory.points: last_time_from_start = traj.joint_trajectory.points[-1].time_from_start + last_time_from_start return merged_trajectory def merge_trajectories_with_type(trajectory_with_type): if not trajectory_with_type: return None # 创建一个新的 JointTrajectory 对象 merged_trajectory_with_type = JointTrajectory_ex() merged_trajectory_with_type.header = trajectory_with_type[0].header merged_trajectory_with_type.joint_names = trajectory_with_type[0].joint_names # 初始化时间累加器 last_time_from_start = rospy.Duration(0) # 合并所有 trajectories 的 points for traj in trajectory_with_type: for point in traj.points: # 创建新的轨迹点 new_point = deepcopy(point) # 累加时间 new_point.time_from_start += last_time_from_start merged_trajectory_with_type.points.append(new_point) # 更新时间累加器为当前轨迹的最后一个点的时间 if traj.points: last_time_from_start = traj.points[-1].time_from_start + last_time_from_start return merged_trajectory_with_type def pub_trajectories(trajectory_with_type_merge): pub = rospy.Publisher("/joint_path", JointTrajectory_ex, queue_size=5) count = 1 rate = rospy.Rate(1) rospy.loginfo("正在发布轨迹信息..") while not rospy.is_shutdown(): #轨迹接受完毕,关闭当前发布 sign_traj_accepted = str(rospy.get_param("sign_traj_accepted")) if sign_traj_accepted == "1": rospy.set_param("sign_traj_accepted",0) break pub.publish(trajectory_with_type_merge) # rospy.loginfo("发布计数,count = %d",count) # count += 1 rate.sleep() if __name__ =="__main__": folder_path = rospy.get_param("folder_path") moveit_server = MoveIt_Control(is_use_gripper=False) moveit_server.arm.set_num_planning_attempts(10) #获取规划信息,并规划 # folder_path = ("/home/chen/catkin_ws/data/0412") welding_sequence = rospy.get_param('welding_sequence') trajectory,trajectory_merge,trajectory_with_type_merge = moveit_server.path_planning(folder_path) # trajectory,trajectory_merge,trajectory_with_type_merge = moveit_server.path_planning_test(folder_path,1,True) #执行动作 moveit_server.arm.execute(trajectory_merge) # for i in range(1 * 2 - 0): #执行到第i条焊缝 # moveit_server.arm.execute(trajectory[i]) #发布规划完毕的轨迹信息 pub_trajectories(trajectory_with_type_merge)