# 导入基本ros和moveit库 from logging.handlers import RotatingFileHandler import os import moveit_msgs.msg import rospy, sys import moveit_commander import moveit_msgs from moveit_commander import MoveGroupCommander, PlanningSceneInterface, RobotCommander from moveit_msgs.msg import PlanningScene, ObjectColor,CollisionObject, AttachedCollisionObject,RobotTrajectory from moveit_msgs.msg import RobotState from moveit_msgs.msg import Constraints, PositionConstraint, OrientationConstraint from shape_msgs.msg import SolidPrimitive from sensor_msgs.msg import JointState import tf.transformations from trajectory_msgs.msg import JointTrajectory,JointTrajectoryPoint from geometry_msgs.msg import PoseStamped, Pose,Point,Vector3 from copy import deepcopy import numpy as np import tf from tf.transformations import euler_from_quaternion,quaternion_from_euler,quaternion_from_euler, quaternion_multiply import math import traceback from lstrobot_planning.msg import JointTrajectoryPoint_ex,JointTrajectory_ex import json import termios from decorator_time import decorator_time import check pi = np.pi class MoveIt_Control: # 初始化程序 def __init__(self, is_use_gripper=False): self.home_pose=[] self.hf_num=0#焊缝计数 从0开始 self.hf_fail=[] # Init ros config moveit_commander.roscpp_initialize(sys.argv) self.robot = moveit_commander.RobotCommander() self.arm = moveit_commander.MoveGroupCommander('manipulator') self.arm.set_goal_joint_tolerance(0.0001) self.arm.set_goal_position_tolerance(0.0005) self.arm.set_goal_orientation_tolerance(0.1) #self.arm.set_planner_id("RRTConnet")#不知道能不能用 self.end_effector_link = self.arm.get_end_effector_link() # 设置机械臂基座的参考系 self.reference_frame = 'base_link' self.arm.set_pose_reference_frame(self.reference_frame) # # 设置最大规划时间和是否允许重新规划 self.arm.set_planning_time(10.0) self.arm.allow_replanning(True) self.arm.set_max_acceleration_scaling_factor(1) self.arm.set_max_velocity_scaling_factor(1) self.arm.set_num_planning_attempts(10) # 机械臂初始姿态 #self.move_j() self.go_home() self.home_pose=self.get_now_pose() #self.go_ready() #TODO plan_cartesian_path def plan_cartesian_path_LLL(self,point_s,point_e,waypoints,trajectory,trajectory_with_type,a=1,v=1): fraction = 0.0 # 路径规划覆盖率 maxtries = 50 # 最大尝试规划次数 attempts = 0 # 已经尝试规划次数 #起点位置设置为规划组最后一个点 start_pose = self.arm.get_current_pose(self.end_effector_link).pose wpose = deepcopy(start_pose) waypoint=[] #print(waypoint) if waypoints: wpose.position.x=waypoints[-1].position.x wpose.position.y=waypoints[-1].position.y wpose.position.z=waypoints[-1].position.z wpose.orientation.x=waypoints[-1].orientation.x wpose.orientation.y=waypoints[-1].orientation.y wpose.orientation.z=waypoints[-1].orientation.z wpose.orientation.w=waypoints[-1].orientation.w waypoint.append(deepcopy(wpose)) #wpose = deepcopy(pose) wpose.position.x=point_s[0] wpose.position.y=point_s[1] wpose.position.z=point_s[2] wpose.orientation.x=point_s[3] wpose.orientation.y=point_s[4] wpose.orientation.z=point_s[5] wpose.orientation.w=point_s[6] waypoint.append(deepcopy(wpose)) waypoints.append(deepcopy(wpose)) wpose.position.x=point_e[0] wpose.position.y=point_e[1] wpose.position.z=point_e[2] wpose.orientation.x=point_e[3] wpose.orientation.y=point_e[4] wpose.orientation.z=point_e[5] wpose.orientation.w=point_e[6] waypoint.append(deepcopy(wpose)) waypoints.append(deepcopy(wpose)) # 尝试规划一条笛卡尔空间下的路径,依次通过所有路点 while fraction < 1.0 and attempts < maxtries: (plan, fraction) = self.arm.compute_cartesian_path( waypoint, # waypoint poses,路点列表 0.001, # eef_step,终端步进值 #0.0, # jump_threshold,跳跃阈值 True, None) # avoid_collisions,避障规划 attempts += 1 if fraction == 1.0 : rospy.loginfo("Path computed successfully.") #traj = plan trajj = plan #取出规划的信息 retimed_planed=self.retime_plan(trajj) trajj_with_type = mark_the_traj(trajj,"during-l",welding_sequence) trajj_with_type.points[-len(trajj.joint_trajectory.points)].type = "start" trajj_with_type.points[-1].type = "end" trajectory.append(trajj) rospy.loginfo("开始执行") moveit_server.arm.execute(retimed_planed) rospy.loginfo("执行结束") trajectory_with_type.append(trajj_with_type) else: rospy.loginfo( "fraction=" + str(fraction) + " success after " + str(maxtries)) # rospy.loginfo("本次焊缝规划失败") # points.pop()#将本条焊缝的起点从规划中删除 # trajectory.pop() # trajectory_with_type.pop() # self.hf_fail.append(welding_sequence[self.hf_num])#将焊缝添加到失败列表 #self.hf_num=self.hf_num+1#焊缝计数 return waypoints,trajectory,trajectory_with_type def retime_plan(self,traj): current_state = self.arm.get_current_state() retimed_traj = self.arm.retime_trajectory( current_state, traj, velocity_scaling_factor=0.3, acceleration_scaling_factor=1) return retimed_traj def get_now_pose(self): # 获取机械臂当前位姿 current_pose = self.arm.get_current_pose(self.end_effector_link).pose pose=[] pose.append(current_pose.position.x) pose.append(current_pose.position.y) pose.append(current_pose.position.z) pose.append(current_pose.orientation.x) pose.append(current_pose.orientation.y) pose.append(current_pose.orientation.z) pose.append(current_pose.orientation.w) # 打印位姿信息 #rospy.lohome_poseinfo("Current Pose: {}".format(pose)) return pose #TODO move_p_path_constraints def create_path_constraints2(self,start_point,end_point,r):#创建路径约束 #计算起点指向终点的向量 vector = np.array([end_point[0]- start_point[0], end_point[1]- start_point[1], end_point[2]- start_point[2]]) height = np.linalg.norm(vector)+0.16 #高度延长16cm radius = r # 位置约束 position_constraint = PositionConstraint()#创建点位约束 position_constraint.header.frame_id = "base_link"#此约束所指的机器人链接 position_constraint.link_name = self.end_effector_link position_constraint.target_point_offset = Vector3(0, 0, 0)#目标点的偏移量 position_constraint.weight = 1.0#质量 OR 加权因子? #构建 shape_msgs/SolidPrimitive 消息 bounding_volume = SolidPrimitive() bounding_volume.type = SolidPrimitive.CYLINDER#圆柱 # bounding_volume.dimensions = [0.003,1] bounding_volume.dimensions = [height,radius]#尺寸 高度 半径 position_constraint.constraint_region.primitives.append(bounding_volume) #构建 geometry_msgs/Pose 消息,用于指定圆柱体在空间中的位置和姿态 pose = Pose() pose.position.x = start_point[0] + vector[0] / 2#定义位置(找了个几何中心) pose.position.y = start_point[1] + vector[1] / 2 pose.position.z = start_point[2] + vector[2] / 2 # 计算圆柱体的姿态 z_axis = np.array([0, 0, 1]) axis = np.cross(z_axis, vector) #计算两个向量(向量数组)的叉乘 叉乘返回的数组既垂直于a,又垂直于b angle = np.arccos(np.dot(z_axis, vector) / np.linalg.norm(vector))#反余弦求角度 q = tf.transformations.quaternion_about_axis(angle, axis)#通过旋转轴和旋转角返回四元数 pose.orientation.x = q[0] pose.orientation.y = q[1] pose.orientation.z = q[2] pose.orientation.w = q[3] position_constraint.constraint_region.primitive_poses.append(pose) constraints = Constraints()#创建一个新的约束信息 constraints.position_constraints.append(position_constraint) # constraints.orientation_constraints.append(orientation_constraint) return constraints #TODO move_p_flexible def move_p_flexible(self,point,points,trajectory,trajectory_with_type,a=1,v=1): self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) rrr=0.09#初始允许半径 9cm er=0 if trajectory: #起点位置设置为规划组最后一个点 state = self.arm.get_current_state() state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions path_constraints = self.create_path_constraints2(points[-1],point,rrr)#将当前的点和输入的点(焊缝的起始点)做圆柱轨迹约束 else: #刚开始规划 起点位置设定为当前状态 按理来说是home点 self.go_home() self.home_pose=self.get_now_pose() state = self.arm.get_current_state() path_constraints = self.create_path_constraints2(self.home_pose,point,rrr)#将当前的点和输入的点(焊缝的起始点)做圆柱轨迹约束 self.arm.set_path_constraints(path_constraints)#设定约束 self.arm.set_pose_target(point, self.end_effector_link)#设定目标点为传入的点 self.arm.set_start_state(state)#起点位置设置为规划组最后一个点 或者当前状态(第一个点时) #尝试规划次数 b = 4 #尝试规划5次 无约束5次 for i in range(b): traj = self.arm.plan()#调用plan进行规划 error=traj[3] if error.val == moveit_msgs.msg._MoveItErrorCodes.MoveItErrorCodes.SUCCESS: trajj = traj[1] #取出规划的信息 rospy.loginfo("正在检查移动轨迹有效性...") error_c, limit_margin=check.check_joint_positions(trajj) if not error_c: rospy.loginfo("本次移动OK") rospy.loginfo("*******************") trajj_with_type = mark_the_traj(trajj,"during-p",welding_sequence) trajj_with_type.points[-len(trajj.joint_trajectory.points)].type = "start" trajj_with_type.points[-1].type = "end" trajectory_with_type.append(trajj_with_type) points.append(point) trajectory.append(trajj) break else: rospy.loginfo("check failed! 移动轨迹无效") rospy.loginfo("移动规划失败-开始第{}次重新规划".format(i+1)) self.arm.clear_path_constraints() rrr=rrr+0.2#每次增加20厘米半径 rospy.loginfo("R值:{}".format(rrr)) if trajectory: path_constraints = self.create_path_constraints2(points[-1],point,rrr)#将最后一个点和输入的点(焊缝的起始点)做圆柱轨迹约束 else: path_constraints = self.create_path_constraints2(self.home_pose,point,rrr)#将最后一个点和起点(焊缝的起始点)做圆柱轨迹约束 self.arm.set_path_constraints(path_constraints)#设定约束 if(i>=(b-2)): rospy.loginfo("所有移动规划尝试失败 取消路径约束") self.arm.clear_path_constraints() if(i==(b-1)): rospy.loginfo("所有移动规划尝试失败 焊缝起点不可达") er=1 break #清除约束 self.arm.clear_path_constraints() return points,trajectory,trajectory_with_type,er #TODO go_home @decorator_time def go_home(self,a=1,v=1): rospy.loginfo("go_home start") self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) self.arm.set_named_target('home') rospy.loginfo("get_home end") self.arm.go() rospy.loginfo("go_home end") # rospy.sleep(1) def go_ready(self,a=1,v=1): rospy.loginfo("go_ready start") self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) self.arm.set_named_target('ready') rospy.loginfo("get_ready end") self.arm.go() rospy.loginfo("go_ready end") #TODO go_home_justplan @decorator_time def go_home_justplan(self,trajectory,trajectory_with_type,a=1,v=1): rospy.loginfo("go_home start") self.arm.set_max_acceleration_scaling_factor(a) self.arm.set_max_velocity_scaling_factor(v) state = self.arm.get_current_state() state.joint_state.position = trajectory[-1].joint_trajectory.points[-1].positions self.arm.set_start_state(state) self.arm.set_named_target('home') traj = self.arm.plan() trajj = traj[1] moveit_server.arm.execute(trajj) rospy.loginfo("go_home end") traj_with_type = mark_the_traj(trajj,"go-home",welding_sequence) trajectory.append(trajj) trajectory_with_type.append(traj_with_type) return trajectory,trajectory_with_type #TODO path_planning @decorator_time def path_planning(self,folder_path,gohome=True): file_path_result = os.path.join(folder_path, 'result.txt') all_data = process_welding_data(file_path_result) way_points,trajectory,trajectory_with_type = [],[],[] for i in range(len(all_data)): rospy.loginfo("共读取到%d条焊缝,开始规划第%d条",len(all_data),i+1) start_point = all_data[i][0] end_point = all_data[i][1] q1 = all_data[i][2] q2 = all_data[i][3] point11 = [start_point[0]/1000, start_point[1]/1000, start_point[2]/1000, q1[0],q1[1],q1[2],q1[3]] point22 = [end_point[0]/1000, end_point[1]/1000, end_point[2]/1000, q2[0],q2[1],q2[2],q2[3]] way_points,trajectory,trajectory_with_type = self.plan_cartesian_path_LLL(point11,point22,way_points,trajectory,trajectory_with_type,v=speed_v) rospy.loginfo("第%d条焊缝规划完毕",i+1) rospy.loginfo("*******************") rospy.loginfo("All of The trajectory Plan successfully") rospy.loginfo("*******************") if gohome: #trajectory,trajectory_with_type= self.move_p_flexible(trajectory,trajectory_with_type) trajectory,trajectory_with_type = self.go_home_justplan(trajectory,trajectory_with_type) traj_merge= merge_trajectories(trajectory) trajectory_with_type_merge= merge_trajectories_with_type(trajectory_with_type) return trajectory,traj_merge,trajectory_with_type_merge ###########################################################class end############################################################# def process_welding_data(filename): all_data = [] # 用来存储每一行处理后的数据 with open(filename, 'r') as file: for line in file: parts = line.strip().split('/') coordinates_and_poses = [part.split(',') for part in parts[1:]] start_point = tuple(map(float, coordinates_and_poses[0][:3])) # 使用元组以避免修改 end_point = tuple(map(float, coordinates_and_poses[1][:3])) q1 = tuple(map(float, coordinates_and_poses[2][:4])) q2 = tuple(map(float, coordinates_and_poses[3][:4])) # 收集每行处理后的数据 all_data.append((start_point, end_point, q1, q2)) return all_data def mark_the_traj(traj,TYPE,SEQUENCE): traj_with_type = JointTrajectory_ex() traj_with_type.header = traj.joint_trajectory.header traj_with_type.joint_names = traj.joint_trajectory.joint_names traj_with_type.points = [ JointTrajectoryPoint_ex( positions=point.positions, velocities=point.velocities, accelerations=point.accelerations, effort=point.effort, type=TYPE, sequence=SEQUENCE ) for point in traj.joint_trajectory.points ] return traj_with_type def merge_trajectories(trajectories): if not trajectories: return None # 创建一个新的 JointTrajectory 对象 merged_trajectory = JointTrajectory() merged_trajectory.header = trajectories[0].joint_trajectory.header merged_trajectory.joint_names = trajectories[0].joint_trajectory.joint_names # 初始化时间累加器 last_time_from_start = rospy.Duration(0) # 合并所有 trajectories 的 points for traj in trajectories: for point in traj.joint_trajectory.points: # 创建新的轨迹点 new_point = deepcopy(point) # 累加时间 new_point.time_from_start += last_time_from_start merged_trajectory.points.append(new_point) # 更新时间累加器为当前轨迹的最后一个点的时间 if traj.joint_trajectory.points: last_time_from_start = traj.joint_trajectory.points[-1].time_from_start + last_time_from_start return merged_trajectory def merge_trajectories_with_type(trajectory_with_type): if not trajectory_with_type: return None # 创建一个新的 JointTrajectory 对象 merged_trajectory_with_type = JointTrajectory_ex() merged_trajectory_with_type.header = trajectory_with_type[0].header merged_trajectory_with_type.joint_names = trajectory_with_type[0].joint_names # 初始化时间累加器 last_time_from_start = rospy.Duration(0) # 合并所有 trajectories 的 points for traj in trajectory_with_type: for point in traj.points: # 创建新的轨迹点 new_point = deepcopy(point) # 累加时间 new_point.time_from_start += last_time_from_start merged_trajectory_with_type.points.append(new_point) # 更新时间累加器为当前轨迹的最后一个点的时间 if traj.points: last_time_from_start = traj.points[-1].time_from_start + last_time_from_start return merged_trajectory_with_type class py_msgs(): fial=[] shun_xv=[] class point(): xyz_list=[] type=[] def ROS2PY_msgs(msgs, m_sr): for i in range(len(msgs.points)): py_msgs.point.xyz_list.append(msgs.points[i].positions) py_msgs.point.type.append(msgs.points[i].type) py_msgs.fail = m_sr.hf_fail.copy() py_msgs.shun_xv = msgs.points[0].sequence.copy() # for i in range(len(py_msgs.point.type)): # print(py_msgs.point.xyz_list[i]) # print(py_msgs.point.type[i]) message = { 'positions': py_msgs.point.xyz_list, # 规划路径点结果 'flags': py_msgs.point.type, # 规划路径点的类型,焊接点移动点 'weld_order': py_msgs.shun_xv, # 规划路径顺序 'failed': py_msgs.fail, # 规划路径失败的焊缝 } return json.dumps(message) # for i in range(len(py_msgs.point.type)): # moveit_server.move_j(py_msgs.point.xyz_list[i]) # #input("任意键继续") def get_valid_input(factor, default): while True: user_input=input(factor) if user_input=="": return default try: value=float(user_input) if 0